[
https://issues.apache.org/jira/browse/SAMZA-2506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Brett Konold updated SAMZA-2506:
--------------------------------
Summary: Inconsistent end of stream semantics in
SystemStreamPartitionMetadata (was: Inconsistent End of stream offset
semantics in SystemStreamPartitionMetadata offsets)
> Inconsistent end of stream semantics in SystemStreamPartitionMetadata
> ---------------------------------------------------------------------
>
> Key: SAMZA-2506
> URL: https://issues.apache.org/jira/browse/SAMZA-2506
> Project: Samza
> Issue Type: New Feature
> Reporter: Brett Konold
> Priority: Major
>
> When consuming a bounded stream (HdfsSystemConsumer or
> InMemorySystemConsumer), an IncomingMessageEnvelope with object =
> EndOfStreamMessage and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET
> is used to delimit the stream and signal that there is no more data to be
> consumed.
> It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin,
> and SystemStreamPartitionMetadata offer a different usage of "oldestOffset",
> "newestOffset", and "upcomingOffset" in relation to these cases of bounded
> streams.
> As described in SystemStreamPartitionMetadata:
> {code:java}
> /**
> * @return The oldest offset that still exists in the stream for the
> * partition given. If a partition has two messages with offsets 0
> * and 1, respectively, then this method would return 0 for the
> * oldest offset. This offset is useful when one wishes to read all
> * messages in a stream from the very beginning. A null value means
> * the stream is empty.
> */
> public String getOldestOffset() {
> return oldestOffset;
> }
> /**
> * @return The newest offset that exists in the stream for the partition
> * given. If a partition has two messages with offsets 0 and 1,
> * respectively, then this method would return 1 for the newest
> * offset. This offset is useful when one wishes to see if all
> * messages have been read from a stream (offset of last message
> * read == newest offset). A null value means the stream is empty.
> */
> public String getNewestOffset() {
> return newestOffset;
> }
> /**
> * @return The offset that represents the next message to be written in the
> * stream for the partition given. If a partition has two messages
> * with offsets 0 and 1, respectively, then this method would return
> * 2 for the upcoming offset. This offset is useful when one wishes
> * to pick up reading at the very end of a stream. A null value
> * means the stream is empty.
> */
> public String getUpcomingOffset() {
> return upcomingOffset;
> }
> {code}
>
> Areas of concern are highlighted below in red.
>
> In the case of HdfsSystemConsumer, messages are read from hdfs files until
> all have been read, [and an end of stream envelope is appended to the end of
> the message buffer|#L237]. The offset metadata returned per partition is:
> *oldestOffset*: offset of beginning of _first_ file in hdfs
> {color:#ff0000}*newestOffset*: offset of beginning of _last_ file in
> hdfs{color}
> * {color:#ff0000}Does NOT include end of stream in numeric offset
> count{color}
> {color:#ff0000}*upcomingOffset*: null{color}
> * {color:#ff0000}Per SystemStreamPartitionMetadata, "null" is meant to
> indicate empty. However, null seems reasonable as "upcoming" does not make
> much sense in the context of bounded streams.{color}
> reference:
> [https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java#L210]
>
>
>
> {color:#172b4d}Further differences are seen in InMemorySystem, where an
> IncomingMessageEnvelope is [placed at the end of the buffer|#L72] similar to
> the HdfsSystemConsumer case above, with an object = EndOfStreamMessage and
> offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}
> {color:#172b4d}However, InMemorySystemAdmin will generate the following
> metadata per partition:{color}
> {color:#172b4d}*oldestOffset*: 0{color}
> {color:#de350b}*newestOffset*: numeric index of last message in the
> buffer{color}
> * {color:#de350b}This will be the index of the end of stream message, and
> this semantic does not match HdfsSystemConsumer above{color}
> {color:#de350b}*upcomingOffset*: newestOffset + 1{color}
> * {color:#de350b}This is reasonable, but again upcoming does not make much
> sense in the context of bounded streams{color}
> reference:
> [https://github.com/Sanil15/samza/blob/master/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java#L182]
>
>
> *Impact:*
> tl;dr blocks refactoring of side input consumption from leveraging RunLoop
> Side input consumption currently blocks the main container start up thread in
> in ContainerStorageManager, until, for each side input SSP:
> # envelope from SystemConsumer returns true for envelope.isEndOfStream (the
> offset of the envelope is IncomingMessageEnvelope.END_OF_STREAM_OFFSET)
> # envelope offset is equal to newest offset for that SSP fetched at CSM
> initialization
> We would like to refactor this flow to, rather than using SystemConsumers
> directly, leverage RunLoop to take advantage of message dispatching,
> concurrency, etc.
> Since RunLoop [sinks end of stream messages|#L706], condition 1 can no longer
> be checked and CSM needs a consistent way of measuring that an SSP's
> consumption has reached "head" or "end" according to the offset metadata
> fetched at init time.
> *Possible (partial) reconciliation:*
> # Modify InMemorySystemAdmin to return "newestOffset" as the numeric index
> of the buffered message immediately preceding the end of stream message. This
> would make the semantics of "newestOffset" match that of HdfsSystemAdmin.
> ## (+) Minimal semantic change in a component used primarily internally in
> samza (InMemorySystem is used in test runner)
> ## (-) Semantics of "upcomingOffset" would be unchanged and left
> inconsistent with HdfsSytemAdmin
> # Modify RunLoop to either pass end of stream messages through to tasks
> either directly or by invoking a callback
> ## (+) EndOfStreamListenerTask today is task-wide, and only called when ALL
> SSPs in a task reach end of stream. Giving SSP granularity might be welcome
> flexibility to customers who intermingle bounded / unbounded streams in the
> same task.
> ## (-) Uncertain whether customers would actually benefit, and significant
> API change.
> # ??
> *Recommendation:*
> Solution 1 above is cheap and minimizes changes to the externally facing
> samza API, as InMemorySystem is primarily used in the internals of samza's
> test runner.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)