[ 
https://issues.apache.org/jira/browse/SAMZA-2506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brett Konold updated SAMZA-2506:
--------------------------------
    Summary: Inconsistent end of stream semantics in 
SystemStreamPartitionMetadata  (was: Inconsistent End of stream offset 
semantics in SystemStreamPartitionMetadata offsets)

> Inconsistent end of stream semantics in SystemStreamPartitionMetadata
> ---------------------------------------------------------------------
>
>                 Key: SAMZA-2506
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2506
>             Project: Samza
>          Issue Type: New Feature
>            Reporter: Brett Konold
>            Priority: Major
>
> When consuming a bounded stream (HdfsSystemConsumer or 
> InMemorySystemConsumer), an IncomingMessageEnvelope with object = 
> EndOfStreamMessage and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET 
> is used to delimit the stream and signal that there is no more data to be 
> consumed.
> It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin, 
> and SystemStreamPartitionMetadata offer a different usage of "oldestOffset", 
> "newestOffset", and "upcomingOffset" in relation to these cases of bounded 
> streams.
> As described in SystemStreamPartitionMetadata:
> {code:java}
> /**
>  * @return The oldest offset that still exists in the stream for the
>  *         partition given. If a partition has two messages with offsets 0
>  *         and 1, respectively, then this method would return 0 for the
>  *         oldest offset. This offset is useful when one wishes to read all
>  *         messages in a stream from the very beginning. A null value means
>  *         the stream is empty.
>  */
> public String getOldestOffset() {
>   return oldestOffset;
> }
> /**
>  * @return The newest offset that exists in the stream for the partition
>  *         given. If a partition has two messages with offsets 0 and 1,
>  *         respectively, then this method would return 1 for the newest
>  *         offset. This offset is useful when one wishes to see if all
>  *         messages have been read from a stream (offset of last message
>  *         read == newest offset). A null value means the stream is empty.
>  */
> public String getNewestOffset() {
>   return newestOffset;
> }
> /**
>  * @return The offset that represents the next message to be written in the
>  *         stream for the partition given. If a partition has two messages
>  *         with offsets 0 and 1, respectively, then this method would return
>  *         2 for the upcoming offset. This offset is useful when one wishes
>  *         to pick up reading at the very end of a stream. A null value
>  *         means the stream is empty.
>  */
> public String getUpcomingOffset() {
>   return upcomingOffset;
> }
> {code}
>  
> Areas of concern are highlighted below in red.
>  
> In the case of HdfsSystemConsumer, messages are read from hdfs files until 
> all have been read, [and an end of stream envelope is appended to the end of 
> the message buffer|#L237]. The offset metadata returned per partition is:
> *oldestOffset*: offset of beginning of _first_ file in hdfs
> {color:#ff0000}*newestOffset*: offset of beginning of _last_ file in 
> hdfs{color}
>  * {color:#ff0000}Does NOT include end of stream in numeric offset 
> count{color}
> {color:#ff0000}*upcomingOffset*: null{color}
>  * {color:#ff0000}Per SystemStreamPartitionMetadata, "null" is meant to 
> indicate empty. However, null seems reasonable as "upcoming" does not make 
> much sense in the context of bounded streams.{color}
> reference: 
> [https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java#L210]
>  
>  
>  
> {color:#172b4d}Further differences are seen in InMemorySystem, where an 
> IncomingMessageEnvelope is [placed at the end of the buffer|#L72] similar to 
> the HdfsSystemConsumer case above, with an object = EndOfStreamMessage and 
> offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}
> {color:#172b4d}However, InMemorySystemAdmin will generate the following 
> metadata per partition:{color}
> {color:#172b4d}*oldestOffset*: 0{color}
> {color:#de350b}*newestOffset*: numeric index of last message in the 
> buffer{color}
>  * {color:#de350b}This will be the index of the end of stream message, and 
> this semantic does not match HdfsSystemConsumer above{color}
> {color:#de350b}*upcomingOffset*: newestOffset + 1{color}
>  * {color:#de350b}This is reasonable, but again upcoming does not make much 
> sense in the context of bounded streams{color}
> reference: 
> [https://github.com/Sanil15/samza/blob/master/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java#L182]
>  
>  
> *Impact:*
> tl;dr blocks refactoring of side input consumption from leveraging RunLoop
> Side input consumption currently blocks the main container start up thread in 
> in ContainerStorageManager, until, for each side input SSP:
>  # envelope from SystemConsumer returns true for envelope.isEndOfStream (the 
> offset of the envelope is IncomingMessageEnvelope.END_OF_STREAM_OFFSET)
>  # envelope offset is equal to newest offset for that SSP fetched at CSM 
> initialization
> We would like to refactor this flow to, rather than using SystemConsumers 
> directly, leverage RunLoop to take advantage of message dispatching, 
> concurrency, etc.
> Since RunLoop [sinks end of stream messages|#L706], condition 1 can no longer 
> be checked and CSM needs a consistent way of measuring that an SSP's 
> consumption has reached "head" or "end" according to the offset metadata 
> fetched at init time.
> *Possible (partial) reconciliation:*   
>  # Modify InMemorySystemAdmin to return "newestOffset" as the numeric index 
> of the buffered message immediately preceding the end of stream message. This 
> would make the semantics of "newestOffset" match that of HdfsSystemAdmin.
>  ## (+) Minimal semantic change in a component used primarily internally in 
> samza (InMemorySystem is used in test runner)
>  ## (-) Semantics of "upcomingOffset" would be unchanged and left 
> inconsistent with HdfsSytemAdmin
>  # Modify RunLoop to either pass end of stream messages through to tasks 
> either directly or by invoking a callback
>  ## (+) EndOfStreamListenerTask today is task-wide, and only called when ALL 
> SSPs in a task reach end of stream. Giving SSP granularity might be welcome 
> flexibility to customers who intermingle bounded / unbounded streams in the 
> same task.
>  ## (-) Uncertain whether customers would actually benefit, and significant 
> API change.
>  # ??
> *Recommendation:*
> Solution 1 above is cheap and minimizes changes to the externally facing 
> samza API, as InMemorySystem is primarily used in the internals of samza's 
> test runner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to