Brett Konold created SAMZA-2506:
-----------------------------------
Summary: Inconsistent End of stream offset semantics in
SystemStreamPartitionMetadata offsets
Key: SAMZA-2506
URL: https://issues.apache.org/jira/browse/SAMZA-2506
Project: Samza
Issue Type: New Feature
Reporter: Brett Konold
When consuming a bounded stream (HdfsSystemConsumer or InMemorySystemConsumer),
an IncomingMessageEnvelope with object = EndOfStreamMessage and offset =
IncomingMessageEnvelope.END_OF_STREAM_OFFSET is used to delimit the stream and
signal that there is no more data to be consumed.
It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin,
and SystemStreamPartitionMetadata offer a different usage of "oldestOffset",
"newestOffset", and "upcomingOffset".
As described in SystemStreamPartitionMetadata:
{code:java}
/**
* @return The oldest offset that still exists in the stream for the
* partition given. If a partition has two messages with offsets 0
* and 1, respectively, then this method would return 0 for the
* oldest offset. This offset is useful when one wishes to read all
* messages in a stream from the very beginning. A null value means
* the stream is empty.
*/
public String getOldestOffset() {
return oldestOffset;
}
/**
* @return The newest offset that exists in the stream for the partition
* given. If a partition has two messages with offsets 0 and 1,
* respectively, then this method would return 1 for the newest
* offset. This offset is useful when one wishes to see if all
* messages have been read from a stream (offset of last message
* read == newest offset). A null value means the stream is empty.
*/
public String getNewestOffset() {
return newestOffset;
}
/**
* @return The offset that represents the next message to be written in the
* stream for the partition given. If a partition has two messages
* with offsets 0 and 1, respectively, then this method would return
* 2 for the upcoming offset. This offset is useful when one wishes
* to pick up reading at the very end of a stream. A null value
* means the stream is empty.
*/
public String getUpcomingOffset() {
return upcomingOffset;
}
{code}
In the case of HdfsSystemConsumer, messages are read from hdfs files until all
have been read,
[and then an end of stream envelope is appended to the end of the message
buffer|[https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java#L237]].
The offset metadata returned per partition is:
oldestOffset: offset of beginning of _first_ file in hdfs
{color:#FF0000}newestOffset: offset of beginning of _last_ file in hdfs{color}
* {color:#ff0000}Does not include end of stream in offset count{color}
{color:#FF0000}upcomingOffset: null{color}
* {color:#FF0000}Per SystemStreamPartitionMetadata, "null" is meant to
indicate empty.{color}
{color:#172b4d}Further differences are seen in InMemorySystemAdmin, when used
as a bounded stream an IncomingMessageEnvelope is [placed at the end of the
buffer|[https://github.com/Sanil15/samza/blob/master/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java#L72]],
as in the HdfsSystemConsumer case above, with an object = EndOfStreamMessage
and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}
{color:#172b4d}However, InMemorySystemAdmin will generate the following
metadata{color}
{color:#172b4d}oldestOffset: 0{color}
{color:#172b4d}newestOffset: numeric index of last message in buffer{color}
{color:#172b4d}oldestOffset: newestOffset + 1{color}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)