[
https://issues.apache.org/jira/browse/SAMZA-2506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Brett Konold updated SAMZA-2506:
--------------------------------
Description:
When consuming a bounded stream (HdfsSystemConsumer or InMemorySystemConsumer),
an IncomingMessageEnvelope with object = EndOfStreamMessage and offset =
IncomingMessageEnvelope.END_OF_STREAM_OFFSET is used to delimit the stream and
signal that there is no more data to be consumed.
It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin,
and SystemStreamPartitionMetadata offer a different usage of "oldestOffset",
"newestOffset", and "upcomingOffset".
As described in SystemStreamPartitionMetadata:
{code:java}
/**
* @return The oldest offset that still exists in the stream for the
* partition given. If a partition has two messages with offsets 0
* and 1, respectively, then this method would return 0 for the
* oldest offset. This offset is useful when one wishes to read all
* messages in a stream from the very beginning. A null value means
* the stream is empty.
*/
public String getOldestOffset() {
return oldestOffset;
}
/**
* @return The newest offset that exists in the stream for the partition
* given. If a partition has two messages with offsets 0 and 1,
* respectively, then this method would return 1 for the newest
* offset. This offset is useful when one wishes to see if all
* messages have been read from a stream (offset of last message
* read == newest offset). A null value means the stream is empty.
*/
public String getNewestOffset() {
return newestOffset;
}
/**
* @return The offset that represents the next message to be written in the
* stream for the partition given. If a partition has two messages
* with offsets 0 and 1, respectively, then this method would return
* 2 for the upcoming offset. This offset is useful when one wishes
* to pick up reading at the very end of a stream. A null value
* means the stream is empty.
*/
public String getUpcomingOffset() {
return upcomingOffset;
}
{code}
In the case of HdfsSystemConsumer, messages are read from hdfs files until all
have been read,
[and then an end of stream envelope is appended to the end of the message
buffer|#L237]]. The offset metadata returned per partition is:
oldestOffset: offset of beginning of _first_ file in hdfs
{color:#ff0000}newestOffset: offset of beginning of _last_ file in hdfs{color}
* {color:#ff0000}Does not include end of stream in offset count{color}
{color:#ff0000}upcomingOffset: null{color}
* {color:#ff0000}Per SystemStreamPartitionMetadata, "null" is meant to
indicate empty.{color}
reference:
[https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java#L210]
{color:#172b4d}Further differences are seen in InMemorySystemAdmin, when used
as a bounded stream an IncomingMessageEnvelope is [placed at the end of the
buffer|#L72]], as in the HdfsSystemConsumer case above, with an object =
EndOfStreamMessage and offset =
IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}
{color:#172b4d}However, InMemorySystemAdmin will generate the following
metadata per partition:{color}
{color:#172b4d}oldestOffset: 0{color}
{color:#172b4d}newestOffset: numeric index of last message in buffer{color}
{color:#172b4d}oldestOffset: newestOffset + 1{color}
reference:
was:
When consuming a bounded stream (HdfsSystemConsumer or InMemorySystemConsumer),
an IncomingMessageEnvelope with object = EndOfStreamMessage and offset =
IncomingMessageEnvelope.END_OF_STREAM_OFFSET is used to delimit the stream and
signal that there is no more data to be consumed.
It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin,
and SystemStreamPartitionMetadata offer a different usage of "oldestOffset",
"newestOffset", and "upcomingOffset".
As described in SystemStreamPartitionMetadata:
{code:java}
/**
* @return The oldest offset that still exists in the stream for the
* partition given. If a partition has two messages with offsets 0
* and 1, respectively, then this method would return 0 for the
* oldest offset. This offset is useful when one wishes to read all
* messages in a stream from the very beginning. A null value means
* the stream is empty.
*/
public String getOldestOffset() {
return oldestOffset;
}
/**
* @return The newest offset that exists in the stream for the partition
* given. If a partition has two messages with offsets 0 and 1,
* respectively, then this method would return 1 for the newest
* offset. This offset is useful when one wishes to see if all
* messages have been read from a stream (offset of last message
* read == newest offset). A null value means the stream is empty.
*/
public String getNewestOffset() {
return newestOffset;
}
/**
* @return The offset that represents the next message to be written in the
* stream for the partition given. If a partition has two messages
* with offsets 0 and 1, respectively, then this method would return
* 2 for the upcoming offset. This offset is useful when one wishes
* to pick up reading at the very end of a stream. A null value
* means the stream is empty.
*/
public String getUpcomingOffset() {
return upcomingOffset;
}
{code}
In the case of HdfsSystemConsumer, messages are read from hdfs files until all
have been read,
[and then an end of stream envelope is appended to the end of the message
buffer|[https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java#L237]].
The offset metadata returned per partition is:
oldestOffset: offset of beginning of _first_ file in hdfs
{color:#FF0000}newestOffset: offset of beginning of _last_ file in hdfs{color}
* {color:#ff0000}Does not include end of stream in offset count{color}
{color:#FF0000}upcomingOffset: null{color}
* {color:#FF0000}Per SystemStreamPartitionMetadata, "null" is meant to
indicate empty.{color}
{color:#172b4d}Further differences are seen in InMemorySystemAdmin, when used
as a bounded stream an IncomingMessageEnvelope is [placed at the end of the
buffer|[https://github.com/Sanil15/samza/blob/master/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java#L72]],
as in the HdfsSystemConsumer case above, with an object = EndOfStreamMessage
and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}
{color:#172b4d}However, InMemorySystemAdmin will generate the following
metadata{color}
{color:#172b4d}oldestOffset: 0{color}
{color:#172b4d}newestOffset: numeric index of last message in buffer{color}
{color:#172b4d}oldestOffset: newestOffset + 1{color}
> Inconsistent End of stream offset semantics in SystemStreamPartitionMetadata
> offsets
> ------------------------------------------------------------------------------------
>
> Key: SAMZA-2506
> URL: https://issues.apache.org/jira/browse/SAMZA-2506
> Project: Samza
> Issue Type: New Feature
> Reporter: Brett Konold
> Priority: Major
>
> When consuming a bounded stream (HdfsSystemConsumer or
> InMemorySystemConsumer), an IncomingMessageEnvelope with object =
> EndOfStreamMessage and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET
> is used to delimit the stream and signal that there is no more data to be
> consumed.
> It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin,
> and SystemStreamPartitionMetadata offer a different usage of "oldestOffset",
> "newestOffset", and "upcomingOffset".
> As described in SystemStreamPartitionMetadata:
> {code:java}
> /**
> * @return The oldest offset that still exists in the stream for the
> * partition given. If a partition has two messages with offsets 0
> * and 1, respectively, then this method would return 0 for the
> * oldest offset. This offset is useful when one wishes to read all
> * messages in a stream from the very beginning. A null value means
> * the stream is empty.
> */
> public String getOldestOffset() {
> return oldestOffset;
> }
> /**
> * @return The newest offset that exists in the stream for the partition
> * given. If a partition has two messages with offsets 0 and 1,
> * respectively, then this method would return 1 for the newest
> * offset. This offset is useful when one wishes to see if all
> * messages have been read from a stream (offset of last message
> * read == newest offset). A null value means the stream is empty.
> */
> public String getNewestOffset() {
> return newestOffset;
> }
> /**
> * @return The offset that represents the next message to be written in the
> * stream for the partition given. If a partition has two messages
> * with offsets 0 and 1, respectively, then this method would return
> * 2 for the upcoming offset. This offset is useful when one wishes
> * to pick up reading at the very end of a stream. A null value
> * means the stream is empty.
> */
> public String getUpcomingOffset() {
> return upcomingOffset;
> }
> {code}
> In the case of HdfsSystemConsumer, messages are read from hdfs files until
> all have been read,
> [and then an end of stream envelope is appended to the end of the message
> buffer|#L237]]. The offset metadata returned per partition is:
> oldestOffset: offset of beginning of _first_ file in hdfs
> {color:#ff0000}newestOffset: offset of beginning of _last_ file in hdfs{color}
> * {color:#ff0000}Does not include end of stream in offset count{color}
> {color:#ff0000}upcomingOffset: null{color}
> * {color:#ff0000}Per SystemStreamPartitionMetadata, "null" is meant to
> indicate empty.{color}
> reference:
> [https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java#L210]
> {color:#172b4d}Further differences are seen in InMemorySystemAdmin, when used
> as a bounded stream an IncomingMessageEnvelope is [placed at the end of the
> buffer|#L72]], as in the HdfsSystemConsumer case above, with an object =
> EndOfStreamMessage and offset =
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}
> {color:#172b4d}However, InMemorySystemAdmin will generate the following
> metadata per partition:{color}
> {color:#172b4d}oldestOffset: 0{color}
> {color:#172b4d}newestOffset: numeric index of last message in buffer{color}
> {color:#172b4d}oldestOffset: newestOffset + 1{color}
> reference:
--
This message was sent by Atlassian Jira
(v8.3.4#803005)