Brett Konold created SAMZA-2506:
-----------------------------------

             Summary: Inconsistent End of stream offset semantics in 
SystemStreamPartitionMetadata offsets
                 Key: SAMZA-2506
                 URL: https://issues.apache.org/jira/browse/SAMZA-2506
             Project: Samza
          Issue Type: New Feature
            Reporter: Brett Konold


When consuming a bounded stream (HdfsSystemConsumer or InMemorySystemConsumer), 
an IncomingMessageEnvelope with object = EndOfStreamMessage and offset = 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET is used to delimit the stream and 
signal that there is no more data to be consumed.

It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin, 
and SystemStreamPartitionMetadata offer a different usage of "oldestOffset", 
"newestOffset", and "upcomingOffset".

As described in SystemStreamPartitionMetadata:
{code:java}
/**
 * @return The oldest offset that still exists in the stream for the
 *         partition given. If a partition has two messages with offsets 0
 *         and 1, respectively, then this method would return 0 for the
 *         oldest offset. This offset is useful when one wishes to read all
 *         messages in a stream from the very beginning. A null value means
 *         the stream is empty.
 */
public String getOldestOffset() {
  return oldestOffset;
}

/**
 * @return The newest offset that exists in the stream for the partition
 *         given. If a partition has two messages with offsets 0 and 1,
 *         respectively, then this method would return 1 for the newest
 *         offset. This offset is useful when one wishes to see if all
 *         messages have been read from a stream (offset of last message
 *         read == newest offset). A null value means the stream is empty.
 */
public String getNewestOffset() {
  return newestOffset;
}

/**
 * @return The offset that represents the next message to be written in the
 *         stream for the partition given. If a partition has two messages
 *         with offsets 0 and 1, respectively, then this method would return
 *         2 for the upcoming offset. This offset is useful when one wishes
 *         to pick up reading at the very end of a stream. A null value
 *         means the stream is empty.
 */
public String getUpcomingOffset() {
  return upcomingOffset;
}
{code}
In the case of HdfsSystemConsumer, messages are read from hdfs files until all 
have been read,

[and then an end of stream envelope is appended to the end of the message 
buffer|[https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java#L237]].
 The offset metadata returned per partition is:

oldestOffset: offset of beginning of _first_ file in hdfs

{color:#FF0000}newestOffset: offset of beginning of _last_ file in hdfs{color}
 * {color:#ff0000}Does not include end of stream in offset count{color}

{color:#FF0000}upcomingOffset: null{color}
 * {color:#FF0000}Per SystemStreamPartitionMetadata, "null" is meant to 
indicate empty.{color}

 

{color:#172b4d}Further differences are seen in InMemorySystemAdmin, when used 
as a bounded stream an IncomingMessageEnvelope is [placed at the end of the 
buffer|[https://github.com/Sanil15/samza/blob/master/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java#L72]],
 as in the HdfsSystemConsumer case above, with an object = EndOfStreamMessage 
and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}

{color:#172b4d}However, InMemorySystemAdmin will generate the following 
metadata{color}

{color:#172b4d}oldestOffset: 0{color}

{color:#172b4d}newestOffset: numeric index of last message in buffer{color}

{color:#172b4d}oldestOffset: newestOffset + 1{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to