[ 
https://issues.apache.org/jira/browse/SAMZA-2506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brett Konold updated SAMZA-2506:
--------------------------------
    Description: 
When consuming a bounded stream (HdfsSystemConsumer or InMemorySystemConsumer), 
an IncomingMessageEnvelope with object = EndOfStreamMessage and offset = 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET is used to delimit the stream and 
signal that there is no more data to be consumed.

It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin, 
and SystemStreamPartitionMetadata offer a different usage of "oldestOffset", 
"newestOffset", and "upcomingOffset".

As described in SystemStreamPartitionMetadata:
{code:java}
/**
 * @return The oldest offset that still exists in the stream for the
 *         partition given. If a partition has two messages with offsets 0
 *         and 1, respectively, then this method would return 0 for the
 *         oldest offset. This offset is useful when one wishes to read all
 *         messages in a stream from the very beginning. A null value means
 *         the stream is empty.
 */
public String getOldestOffset() {
  return oldestOffset;
}

/**
 * @return The newest offset that exists in the stream for the partition
 *         given. If a partition has two messages with offsets 0 and 1,
 *         respectively, then this method would return 1 for the newest
 *         offset. This offset is useful when one wishes to see if all
 *         messages have been read from a stream (offset of last message
 *         read == newest offset). A null value means the stream is empty.
 */
public String getNewestOffset() {
  return newestOffset;
}

/**
 * @return The offset that represents the next message to be written in the
 *         stream for the partition given. If a partition has two messages
 *         with offsets 0 and 1, respectively, then this method would return
 *         2 for the upcoming offset. This offset is useful when one wishes
 *         to pick up reading at the very end of a stream. A null value
 *         means the stream is empty.
 */
public String getUpcomingOffset() {
  return upcomingOffset;
}
{code}
In the case of HdfsSystemConsumer, messages are read from hdfs files until all 
have been read,

[and then an end of stream envelope is appended to the end of the message 
buffer|#L237]]. The offset metadata returned per partition is:

oldestOffset: offset of beginning of _first_ file in hdfs

{color:#ff0000}newestOffset: offset of beginning of _last_ file in hdfs{color}
 * {color:#ff0000}Does not include end of stream in offset count{color}

{color:#ff0000}upcomingOffset: null{color}
 * {color:#ff0000}Per SystemStreamPartitionMetadata, "null" is meant to 
indicate empty.{color}

 reference: 
[https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java#L210]

{color:#172b4d}Further differences are seen in InMemorySystemAdmin, when used 
as a bounded stream an IncomingMessageEnvelope is [placed at the end of the 
buffer|#L72]], as in the HdfsSystemConsumer case above, with an object = 
EndOfStreamMessage and offset = 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}

{color:#172b4d}However, InMemorySystemAdmin will generate the following 
metadata per partition:{color}

{color:#172b4d}oldestOffset: 0{color}

{color:#172b4d}newestOffset: numeric index of last message in buffer{color}

{color:#172b4d}oldestOffset: newestOffset + 1{color}

reference:

  was:
When consuming a bounded stream (HdfsSystemConsumer or InMemorySystemConsumer), 
an IncomingMessageEnvelope with object = EndOfStreamMessage and offset = 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET is used to delimit the stream and 
signal that there is no more data to be consumed.

It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin, 
and SystemStreamPartitionMetadata offer a different usage of "oldestOffset", 
"newestOffset", and "upcomingOffset".

As described in SystemStreamPartitionMetadata:
{code:java}
/**
 * @return The oldest offset that still exists in the stream for the
 *         partition given. If a partition has two messages with offsets 0
 *         and 1, respectively, then this method would return 0 for the
 *         oldest offset. This offset is useful when one wishes to read all
 *         messages in a stream from the very beginning. A null value means
 *         the stream is empty.
 */
public String getOldestOffset() {
  return oldestOffset;
}

/**
 * @return The newest offset that exists in the stream for the partition
 *         given. If a partition has two messages with offsets 0 and 1,
 *         respectively, then this method would return 1 for the newest
 *         offset. This offset is useful when one wishes to see if all
 *         messages have been read from a stream (offset of last message
 *         read == newest offset). A null value means the stream is empty.
 */
public String getNewestOffset() {
  return newestOffset;
}

/**
 * @return The offset that represents the next message to be written in the
 *         stream for the partition given. If a partition has two messages
 *         with offsets 0 and 1, respectively, then this method would return
 *         2 for the upcoming offset. This offset is useful when one wishes
 *         to pick up reading at the very end of a stream. A null value
 *         means the stream is empty.
 */
public String getUpcomingOffset() {
  return upcomingOffset;
}
{code}
In the case of HdfsSystemConsumer, messages are read from hdfs files until all 
have been read,

[and then an end of stream envelope is appended to the end of the message 
buffer|[https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java#L237]].
 The offset metadata returned per partition is:

oldestOffset: offset of beginning of _first_ file in hdfs

{color:#FF0000}newestOffset: offset of beginning of _last_ file in hdfs{color}
 * {color:#ff0000}Does not include end of stream in offset count{color}

{color:#FF0000}upcomingOffset: null{color}
 * {color:#FF0000}Per SystemStreamPartitionMetadata, "null" is meant to 
indicate empty.{color}

 

{color:#172b4d}Further differences are seen in InMemorySystemAdmin, when used 
as a bounded stream an IncomingMessageEnvelope is [placed at the end of the 
buffer|[https://github.com/Sanil15/samza/blob/master/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java#L72]],
 as in the HdfsSystemConsumer case above, with an object = EndOfStreamMessage 
and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}

{color:#172b4d}However, InMemorySystemAdmin will generate the following 
metadata{color}

{color:#172b4d}oldestOffset: 0{color}

{color:#172b4d}newestOffset: numeric index of last message in buffer{color}

{color:#172b4d}oldestOffset: newestOffset + 1{color}


> Inconsistent End of stream offset semantics in SystemStreamPartitionMetadata 
> offsets
> ------------------------------------------------------------------------------------
>
>                 Key: SAMZA-2506
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2506
>             Project: Samza
>          Issue Type: New Feature
>            Reporter: Brett Konold
>            Priority: Major
>
> When consuming a bounded stream (HdfsSystemConsumer or 
> InMemorySystemConsumer), an IncomingMessageEnvelope with object = 
> EndOfStreamMessage and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET 
> is used to delimit the stream and signal that there is no more data to be 
> consumed.
> It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin, 
> and SystemStreamPartitionMetadata offer a different usage of "oldestOffset", 
> "newestOffset", and "upcomingOffset".
> As described in SystemStreamPartitionMetadata:
> {code:java}
> /**
>  * @return The oldest offset that still exists in the stream for the
>  *         partition given. If a partition has two messages with offsets 0
>  *         and 1, respectively, then this method would return 0 for the
>  *         oldest offset. This offset is useful when one wishes to read all
>  *         messages in a stream from the very beginning. A null value means
>  *         the stream is empty.
>  */
> public String getOldestOffset() {
>   return oldestOffset;
> }
> /**
>  * @return The newest offset that exists in the stream for the partition
>  *         given. If a partition has two messages with offsets 0 and 1,
>  *         respectively, then this method would return 1 for the newest
>  *         offset. This offset is useful when one wishes to see if all
>  *         messages have been read from a stream (offset of last message
>  *         read == newest offset). A null value means the stream is empty.
>  */
> public String getNewestOffset() {
>   return newestOffset;
> }
> /**
>  * @return The offset that represents the next message to be written in the
>  *         stream for the partition given. If a partition has two messages
>  *         with offsets 0 and 1, respectively, then this method would return
>  *         2 for the upcoming offset. This offset is useful when one wishes
>  *         to pick up reading at the very end of a stream. A null value
>  *         means the stream is empty.
>  */
> public String getUpcomingOffset() {
>   return upcomingOffset;
> }
> {code}
> In the case of HdfsSystemConsumer, messages are read from hdfs files until 
> all have been read,
> [and then an end of stream envelope is appended to the end of the message 
> buffer|#L237]]. The offset metadata returned per partition is:
> oldestOffset: offset of beginning of _first_ file in hdfs
> {color:#ff0000}newestOffset: offset of beginning of _last_ file in hdfs{color}
>  * {color:#ff0000}Does not include end of stream in offset count{color}
> {color:#ff0000}upcomingOffset: null{color}
>  * {color:#ff0000}Per SystemStreamPartitionMetadata, "null" is meant to 
> indicate empty.{color}
>  reference: 
> [https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java#L210]
> {color:#172b4d}Further differences are seen in InMemorySystemAdmin, when used 
> as a bounded stream an IncomingMessageEnvelope is [placed at the end of the 
> buffer|#L72]], as in the HdfsSystemConsumer case above, with an object = 
> EndOfStreamMessage and offset = 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}
> {color:#172b4d}However, InMemorySystemAdmin will generate the following 
> metadata per partition:{color}
> {color:#172b4d}oldestOffset: 0{color}
> {color:#172b4d}newestOffset: numeric index of last message in buffer{color}
> {color:#172b4d}oldestOffset: newestOffset + 1{color}
> reference:



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to