[
https://issues.apache.org/jira/browse/SAMZA-2506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Brett Konold updated SAMZA-2506:
--------------------------------
Description:
When consuming a bounded stream (HdfsSystemConsumer or InMemorySystemConsumer),
an IncomingMessageEnvelope with object = EndOfStreamMessage and offset =
IncomingMessageEnvelope.END_OF_STREAM_OFFSET is used to delimit the stream and
signal that there is no more data to be consumed.
It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin,
and SystemStreamPartitionMetadata offer a different usage of "oldestOffset",
"newestOffset", and "upcomingOffset".
As described in SystemStreamPartitionMetadata:
{code:java}
/**
* @return The oldest offset that still exists in the stream for the
* partition given. If a partition has two messages with offsets 0
* and 1, respectively, then this method would return 0 for the
* oldest offset. This offset is useful when one wishes to read all
* messages in a stream from the very beginning. A null value means
* the stream is empty.
*/
public String getOldestOffset() {
return oldestOffset;
}
/**
* @return The newest offset that exists in the stream for the partition
* given. If a partition has two messages with offsets 0 and 1,
* respectively, then this method would return 1 for the newest
* offset. This offset is useful when one wishes to see if all
* messages have been read from a stream (offset of last message
* read == newest offset). A null value means the stream is empty.
*/
public String getNewestOffset() {
return newestOffset;
}
/**
* @return The offset that represents the next message to be written in the
* stream for the partition given. If a partition has two messages
* with offsets 0 and 1, respectively, then this method would return
* 2 for the upcoming offset. This offset is useful when one wishes
* to pick up reading at the very end of a stream. A null value
* means the stream is empty.
*/
public String getUpcomingOffset() {
return upcomingOffset;
}
{code}
In the case of HdfsSystemConsumer, messages are read from hdfs files until all
have been read, [and an end of stream envelope is appended to the end of the
message buffer|#L237]. The offset metadata returned per partition is:
*oldestOffset*: offset of beginning of _first_ file in hdfs
{color:#ff0000}*newestOffset*: offset of beginning of _last_ file in hdfs{color}
* {color:#ff0000}Does NOT include end of stream in numeric offset count{color}
{color:#ff0000}*upcomingOffset*: null{color}
* {color:#ff0000}Per SystemStreamPartitionMetadata, "null" is meant to
indicate empty. However, null seems reasonable as "upcoming" does not make much
sense in the context of bounded streams.{color}
reference:
[https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java#L210]
{color:#172b4d}Further differences are seen in InMemorySystem, where an
IncomingMessageEnvelope is [placed at the end of the buffer|#L72] similar to
the HdfsSystemConsumer case above, with an object = EndOfStreamMessage and
offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}
{color:#172b4d}However, InMemorySystemAdmin will generate the following
metadata per partition:{color}
{color:#172b4d}*oldestOffset*: 0{color}
{color:#de350b}*newestOffset*: numeric index of last message in the
buffer{color}
* {color:#de350b}This will be the index of the end of stream message, and this
semantic does not match HdfsSystemConsumer above{color}
{color:#de350b}*upcomingOffset*: newestOffset + 1{color}
* {color:#de350b}This is reasonable, but again upcoming does not make much
sense in the context of bounded streams{color}
reference:
[https://github.com/Sanil15/samza/blob/master/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java#L182]
*Impact:*
**tl;dr blocks refactoring of side input consumption from leveraging RunLoop
**Side input consumption currently blocks the main container start up thread in
in ContainerStorageManager, until, for each side input SSP:
# envelope from SystemConsumer returns true for envelope.isEndOfStream (the
offset of the envelope is IncomingMessageEnvelope.END_OF_STREAM_OFFSET)
# envelope offset is equal to newest offset for that SSP fetched at CSM
initialization
We would like to refactor this flow to, rather than using SystemConsumers
directly, leverage RunLoop to take advantage of message dispatching,
concurrency, etc.
Since RunLoop [sinks end of stream
messages|[https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/RunLoop.java#L706]]
*Possible Reconciliations:*
1.
2.
was:
When consuming a bounded stream (HdfsSystemConsumer or InMemorySystemConsumer),
an IncomingMessageEnvelope with object = EndOfStreamMessage and offset =
IncomingMessageEnvelope.END_OF_STREAM_OFFSET is used to delimit the stream and
signal that there is no more data to be consumed.
It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin,
and SystemStreamPartitionMetadata offer a different usage of "oldestOffset",
"newestOffset", and "upcomingOffset".
As described in SystemStreamPartitionMetadata:
{code:java}
/**
* @return The oldest offset that still exists in the stream for the
* partition given. If a partition has two messages with offsets 0
* and 1, respectively, then this method would return 0 for the
* oldest offset. This offset is useful when one wishes to read all
* messages in a stream from the very beginning. A null value means
* the stream is empty.
*/
public String getOldestOffset() {
return oldestOffset;
}
/**
* @return The newest offset that exists in the stream for the partition
* given. If a partition has two messages with offsets 0 and 1,
* respectively, then this method would return 1 for the newest
* offset. This offset is useful when one wishes to see if all
* messages have been read from a stream (offset of last message
* read == newest offset). A null value means the stream is empty.
*/
public String getNewestOffset() {
return newestOffset;
}
/**
* @return The offset that represents the next message to be written in the
* stream for the partition given. If a partition has two messages
* with offsets 0 and 1, respectively, then this method would return
* 2 for the upcoming offset. This offset is useful when one wishes
* to pick up reading at the very end of a stream. A null value
* means the stream is empty.
*/
public String getUpcomingOffset() {
return upcomingOffset;
}
{code}
In the case of HdfsSystemConsumer, messages are read from hdfs files until all
have been read,
[and then an end of stream envelope is appended to the end of the message
buffer|#L237]]. The offset metadata returned per partition is:
oldestOffset: offset of beginning of _first_ file in hdfs
{color:#ff0000}newestOffset: offset of beginning of _last_ file in hdfs{color}
* {color:#ff0000}Does not include end of stream in offset count{color}
{color:#ff0000}upcomingOffset: null{color}
* {color:#ff0000}Per SystemStreamPartitionMetadata, "null" is meant to
indicate empty.{color}
reference:
[https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java#L210]
{color:#172b4d}Further differences are seen in InMemorySystemAdmin, when used
as a bounded stream an IncomingMessageEnvelope is [placed at the end of the
buffer|#L72]], as in the HdfsSystemConsumer case above, with an object =
EndOfStreamMessage and offset =
IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}
{color:#172b4d}However, InMemorySystemAdmin will generate the following
metadata per partition:{color}
{color:#172b4d}oldestOffset: 0{color}
{color:#172b4d}newestOffset: numeric index of last message in buffer{color}
{color:#172b4d}oldestOffset: newestOffset + 1{color}
reference:
> Inconsistent End of stream offset semantics in SystemStreamPartitionMetadata
> offsets
> ------------------------------------------------------------------------------------
>
> Key: SAMZA-2506
> URL: https://issues.apache.org/jira/browse/SAMZA-2506
> Project: Samza
> Issue Type: New Feature
> Reporter: Brett Konold
> Priority: Major
>
> When consuming a bounded stream (HdfsSystemConsumer or
> InMemorySystemConsumer), an IncomingMessageEnvelope with object =
> EndOfStreamMessage and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET
> is used to delimit the stream and signal that there is no more data to be
> consumed.
> It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin,
> and SystemStreamPartitionMetadata offer a different usage of "oldestOffset",
> "newestOffset", and "upcomingOffset".
> As described in SystemStreamPartitionMetadata:
> {code:java}
> /**
> * @return The oldest offset that still exists in the stream for the
> * partition given. If a partition has two messages with offsets 0
> * and 1, respectively, then this method would return 0 for the
> * oldest offset. This offset is useful when one wishes to read all
> * messages in a stream from the very beginning. A null value means
> * the stream is empty.
> */
> public String getOldestOffset() {
> return oldestOffset;
> }
> /**
> * @return The newest offset that exists in the stream for the partition
> * given. If a partition has two messages with offsets 0 and 1,
> * respectively, then this method would return 1 for the newest
> * offset. This offset is useful when one wishes to see if all
> * messages have been read from a stream (offset of last message
> * read == newest offset). A null value means the stream is empty.
> */
> public String getNewestOffset() {
> return newestOffset;
> }
> /**
> * @return The offset that represents the next message to be written in the
> * stream for the partition given. If a partition has two messages
> * with offsets 0 and 1, respectively, then this method would return
> * 2 for the upcoming offset. This offset is useful when one wishes
> * to pick up reading at the very end of a stream. A null value
> * means the stream is empty.
> */
> public String getUpcomingOffset() {
> return upcomingOffset;
> }
> {code}
>
>
>
> In the case of HdfsSystemConsumer, messages are read from hdfs files until
> all have been read, [and an end of stream envelope is appended to the end of
> the message buffer|#L237]. The offset metadata returned per partition is:
> *oldestOffset*: offset of beginning of _first_ file in hdfs
> {color:#ff0000}*newestOffset*: offset of beginning of _last_ file in
> hdfs{color}
> * {color:#ff0000}Does NOT include end of stream in numeric offset
> count{color}
> {color:#ff0000}*upcomingOffset*: null{color}
> * {color:#ff0000}Per SystemStreamPartitionMetadata, "null" is meant to
> indicate empty. However, null seems reasonable as "upcoming" does not make
> much sense in the context of bounded streams.{color}
> reference:
> [https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java#L210]
>
>
>
> {color:#172b4d}Further differences are seen in InMemorySystem, where an
> IncomingMessageEnvelope is [placed at the end of the buffer|#L72] similar to
> the HdfsSystemConsumer case above, with an object = EndOfStreamMessage and
> offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}
> {color:#172b4d}However, InMemorySystemAdmin will generate the following
> metadata per partition:{color}
> {color:#172b4d}*oldestOffset*: 0{color}
> {color:#de350b}*newestOffset*: numeric index of last message in the
> buffer{color}
> * {color:#de350b}This will be the index of the end of stream message, and
> this semantic does not match HdfsSystemConsumer above{color}
> {color:#de350b}*upcomingOffset*: newestOffset + 1{color}
> * {color:#de350b}This is reasonable, but again upcoming does not make much
> sense in the context of bounded streams{color}
> reference:
> [https://github.com/Sanil15/samza/blob/master/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java#L182]
>
>
> *Impact:*
> **tl;dr blocks refactoring of side input consumption from leveraging RunLoop
> **Side input consumption currently blocks the main container start up thread
> in in ContainerStorageManager, until, for each side input SSP:
> # envelope from SystemConsumer returns true for envelope.isEndOfStream (the
> offset of the envelope is IncomingMessageEnvelope.END_OF_STREAM_OFFSET)
> # envelope offset is equal to newest offset for that SSP fetched at CSM
> initialization
> We would like to refactor this flow to, rather than using SystemConsumers
> directly, leverage RunLoop to take advantage of message dispatching,
> concurrency, etc.
> Since RunLoop [sinks end of stream
> messages|[https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/RunLoop.java#L706]]
> *Possible Reconciliations:*
> 1.
> 2.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)