[ 
https://issues.apache.org/jira/browse/SAMZA-2506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brett Konold updated SAMZA-2506:
--------------------------------
    Description: 
When consuming a bounded stream (HdfsSystemConsumer or InMemorySystemConsumer), 
an IncomingMessageEnvelope with object = EndOfStreamMessage and offset = 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET is used to delimit the stream and 
signal that there is no more data to be consumed.

It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin, 
and SystemStreamPartitionMetadata offer a different usage of "oldestOffset", 
"newestOffset", and "upcomingOffset".

As described in SystemStreamPartitionMetadata:
{code:java}
/**
 * @return The oldest offset that still exists in the stream for the
 *         partition given. If a partition has two messages with offsets 0
 *         and 1, respectively, then this method would return 0 for the
 *         oldest offset. This offset is useful when one wishes to read all
 *         messages in a stream from the very beginning. A null value means
 *         the stream is empty.
 */
public String getOldestOffset() {
  return oldestOffset;
}

/**
 * @return The newest offset that exists in the stream for the partition
 *         given. If a partition has two messages with offsets 0 and 1,
 *         respectively, then this method would return 1 for the newest
 *         offset. This offset is useful when one wishes to see if all
 *         messages have been read from a stream (offset of last message
 *         read == newest offset). A null value means the stream is empty.
 */
public String getNewestOffset() {
  return newestOffset;
}

/**
 * @return The offset that represents the next message to be written in the
 *         stream for the partition given. If a partition has two messages
 *         with offsets 0 and 1, respectively, then this method would return
 *         2 for the upcoming offset. This offset is useful when one wishes
 *         to pick up reading at the very end of a stream. A null value
 *         means the stream is empty.
 */
public String getUpcomingOffset() {
  return upcomingOffset;
}
{code}
 

 

 

In the case of HdfsSystemConsumer, messages are read from hdfs files until all 
have been read, [and an end of stream envelope is appended to the end of the 
message buffer|#L237]. The offset metadata returned per partition is:

*oldestOffset*: offset of beginning of _first_ file in hdfs

{color:#ff0000}*newestOffset*: offset of beginning of _last_ file in hdfs{color}
 * {color:#ff0000}Does NOT include end of stream in numeric offset count{color}

{color:#ff0000}*upcomingOffset*: null{color}
 * {color:#ff0000}Per SystemStreamPartitionMetadata, "null" is meant to 
indicate empty. However, null seems reasonable as "upcoming" does not make much 
sense in the context of bounded streams.{color}

reference: 
[https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java#L210]

 

 

 

{color:#172b4d}Further differences are seen in InMemorySystem, where an 
IncomingMessageEnvelope is [placed at the end of the buffer|#L72] similar to 
the HdfsSystemConsumer case above, with an object = EndOfStreamMessage and 
offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}

{color:#172b4d}However, InMemorySystemAdmin will generate the following 
metadata per partition:{color}

{color:#172b4d}*oldestOffset*: 0{color}

{color:#de350b}*newestOffset*: numeric index of last message in the 
buffer{color}
 * {color:#de350b}This will be the index of the end of stream message, and this 
semantic does not match HdfsSystemConsumer above{color}

{color:#de350b}*upcomingOffset*: newestOffset + 1{color}
 * {color:#de350b}This is reasonable, but again upcoming does not make much 
sense in the context of bounded streams{color}

reference: 
[https://github.com/Sanil15/samza/blob/master/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java#L182]

 

 

*Impact:*

**tl;dr blocks refactoring of side input consumption from leveraging RunLoop

**Side input consumption currently blocks the main container start up thread in 
in ContainerStorageManager, until, for each side input SSP:
 # envelope from SystemConsumer returns true for envelope.isEndOfStream (the 
offset of the envelope is IncomingMessageEnvelope.END_OF_STREAM_OFFSET)
 # envelope offset is equal to newest offset for that SSP fetched at CSM 
initialization

We would like to refactor this flow to, rather than using SystemConsumers 
directly, leverage RunLoop to take advantage of message dispatching, 
concurrency, etc.

Since RunLoop [sinks end of stream messages|#L706], condition 1 can no longer 
be checked and CSM needs a consistent way of measuring that an SSP's 
consumption has reached "head" or "end" according to the offset metadata 
fetched at init time.

*Possible (partial) reconciliation:*   
 # Modify InMemorySystemAdmin to return "newestOffset" as the numeric index of 
the buffered message immediately preceding the end of stream message. This 
would make the semantics of "newestOffset" match that of HdfsSystemAdmin.
 ## (-) Semantics of "upcomingOffset" would be unchanged and left inconsistent 
with HdfsSytemAdmin
 # Modify RunLoop to either pass end of stream messages through to tasks either 
directly or by invoking a callback
 ## (+) EndOfStreamListenerTask today is task-wide, and only called when ALL 
SSPs in a task reach end of stream. Giving SSP granularity might be welcome 
flexibility to customers who intermingle bounded / unbounded streams in the 
same task.
 ## (-) Uncertain whether customers would actually benefit, and significant API 
change.
 # ??

*Recommendation:*

Solution 1 above is cheap and minimizes changes to the externally facing samza 
API, as InMemorySystem is primarily used in the internals of samza's test 
runner.

  was:
When consuming a bounded stream (HdfsSystemConsumer or InMemorySystemConsumer), 
an IncomingMessageEnvelope with object = EndOfStreamMessage and offset = 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET is used to delimit the stream and 
signal that there is no more data to be consumed.

It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin, 
and SystemStreamPartitionMetadata offer a different usage of "oldestOffset", 
"newestOffset", and "upcomingOffset".

As described in SystemStreamPartitionMetadata:
{code:java}
/**
 * @return The oldest offset that still exists in the stream for the
 *         partition given. If a partition has two messages with offsets 0
 *         and 1, respectively, then this method would return 0 for the
 *         oldest offset. This offset is useful when one wishes to read all
 *         messages in a stream from the very beginning. A null value means
 *         the stream is empty.
 */
public String getOldestOffset() {
  return oldestOffset;
}

/**
 * @return The newest offset that exists in the stream for the partition
 *         given. If a partition has two messages with offsets 0 and 1,
 *         respectively, then this method would return 1 for the newest
 *         offset. This offset is useful when one wishes to see if all
 *         messages have been read from a stream (offset of last message
 *         read == newest offset). A null value means the stream is empty.
 */
public String getNewestOffset() {
  return newestOffset;
}

/**
 * @return The offset that represents the next message to be written in the
 *         stream for the partition given. If a partition has two messages
 *         with offsets 0 and 1, respectively, then this method would return
 *         2 for the upcoming offset. This offset is useful when one wishes
 *         to pick up reading at the very end of a stream. A null value
 *         means the stream is empty.
 */
public String getUpcomingOffset() {
  return upcomingOffset;
}
{code}
 

 

 

In the case of HdfsSystemConsumer, messages are read from hdfs files until all 
have been read, [and an end of stream envelope is appended to the end of the 
message buffer|#L237]. The offset metadata returned per partition is:

*oldestOffset*: offset of beginning of _first_ file in hdfs

{color:#ff0000}*newestOffset*: offset of beginning of _last_ file in hdfs{color}
 * {color:#ff0000}Does NOT include end of stream in numeric offset count{color}

{color:#ff0000}*upcomingOffset*: null{color}
 * {color:#ff0000}Per SystemStreamPartitionMetadata, "null" is meant to 
indicate empty. However, null seems reasonable as "upcoming" does not make much 
sense in the context of bounded streams.{color}

reference: 
[https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java#L210]

 

 

 

{color:#172b4d}Further differences are seen in InMemorySystem, where an 
IncomingMessageEnvelope is [placed at the end of the buffer|#L72] similar to 
the HdfsSystemConsumer case above, with an object = EndOfStreamMessage and 
offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}

{color:#172b4d}However, InMemorySystemAdmin will generate the following 
metadata per partition:{color}

{color:#172b4d}*oldestOffset*: 0{color}

{color:#de350b}*newestOffset*: numeric index of last message in the 
buffer{color}
 * {color:#de350b}This will be the index of the end of stream message, and this 
semantic does not match HdfsSystemConsumer above{color}

{color:#de350b}*upcomingOffset*: newestOffset + 1{color}
 * {color:#de350b}This is reasonable, but again upcoming does not make much 
sense in the context of bounded streams{color}

reference: 
[https://github.com/Sanil15/samza/blob/master/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java#L182]

 

 

*Impact:*

**tl;dr blocks refactoring of side input consumption from leveraging RunLoop

**Side input consumption currently blocks the main container start up thread in 
in ContainerStorageManager, until, for each side input SSP:
 # envelope from SystemConsumer returns true for envelope.isEndOfStream (the 
offset of the envelope is IncomingMessageEnvelope.END_OF_STREAM_OFFSET)
 # envelope offset is equal to newest offset for that SSP fetched at CSM 
initialization

We would like to refactor this flow to, rather than using SystemConsumers 
directly, leverage RunLoop to take advantage of message dispatching, 
concurrency, etc.

Since RunLoop [sinks end of stream 
messages|[https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/RunLoop.java#L706]]

*Possible Reconciliations:*

1. 

2. 


> Inconsistent End of stream offset semantics in SystemStreamPartitionMetadata 
> offsets
> ------------------------------------------------------------------------------------
>
>                 Key: SAMZA-2506
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2506
>             Project: Samza
>          Issue Type: New Feature
>            Reporter: Brett Konold
>            Priority: Major
>
> When consuming a bounded stream (HdfsSystemConsumer or 
> InMemorySystemConsumer), an IncomingMessageEnvelope with object = 
> EndOfStreamMessage and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET 
> is used to delimit the stream and signal that there is no more data to be 
> consumed.
> It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin, 
> and SystemStreamPartitionMetadata offer a different usage of "oldestOffset", 
> "newestOffset", and "upcomingOffset".
> As described in SystemStreamPartitionMetadata:
> {code:java}
> /**
>  * @return The oldest offset that still exists in the stream for the
>  *         partition given. If a partition has two messages with offsets 0
>  *         and 1, respectively, then this method would return 0 for the
>  *         oldest offset. This offset is useful when one wishes to read all
>  *         messages in a stream from the very beginning. A null value means
>  *         the stream is empty.
>  */
> public String getOldestOffset() {
>   return oldestOffset;
> }
> /**
>  * @return The newest offset that exists in the stream for the partition
>  *         given. If a partition has two messages with offsets 0 and 1,
>  *         respectively, then this method would return 1 for the newest
>  *         offset. This offset is useful when one wishes to see if all
>  *         messages have been read from a stream (offset of last message
>  *         read == newest offset). A null value means the stream is empty.
>  */
> public String getNewestOffset() {
>   return newestOffset;
> }
> /**
>  * @return The offset that represents the next message to be written in the
>  *         stream for the partition given. If a partition has two messages
>  *         with offsets 0 and 1, respectively, then this method would return
>  *         2 for the upcoming offset. This offset is useful when one wishes
>  *         to pick up reading at the very end of a stream. A null value
>  *         means the stream is empty.
>  */
> public String getUpcomingOffset() {
>   return upcomingOffset;
> }
> {code}
>  
>  
>  
> In the case of HdfsSystemConsumer, messages are read from hdfs files until 
> all have been read, [and an end of stream envelope is appended to the end of 
> the message buffer|#L237]. The offset metadata returned per partition is:
> *oldestOffset*: offset of beginning of _first_ file in hdfs
> {color:#ff0000}*newestOffset*: offset of beginning of _last_ file in 
> hdfs{color}
>  * {color:#ff0000}Does NOT include end of stream in numeric offset 
> count{color}
> {color:#ff0000}*upcomingOffset*: null{color}
>  * {color:#ff0000}Per SystemStreamPartitionMetadata, "null" is meant to 
> indicate empty. However, null seems reasonable as "upcoming" does not make 
> much sense in the context of bounded streams.{color}
> reference: 
> [https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java#L210]
>  
>  
>  
> {color:#172b4d}Further differences are seen in InMemorySystem, where an 
> IncomingMessageEnvelope is [placed at the end of the buffer|#L72] similar to 
> the HdfsSystemConsumer case above, with an object = EndOfStreamMessage and 
> offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET.{color}
> {color:#172b4d}However, InMemorySystemAdmin will generate the following 
> metadata per partition:{color}
> {color:#172b4d}*oldestOffset*: 0{color}
> {color:#de350b}*newestOffset*: numeric index of last message in the 
> buffer{color}
>  * {color:#de350b}This will be the index of the end of stream message, and 
> this semantic does not match HdfsSystemConsumer above{color}
> {color:#de350b}*upcomingOffset*: newestOffset + 1{color}
>  * {color:#de350b}This is reasonable, but again upcoming does not make much 
> sense in the context of bounded streams{color}
> reference: 
> [https://github.com/Sanil15/samza/blob/master/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java#L182]
>  
>  
> *Impact:*
> **tl;dr blocks refactoring of side input consumption from leveraging RunLoop
> **Side input consumption currently blocks the main container start up thread 
> in in ContainerStorageManager, until, for each side input SSP:
>  # envelope from SystemConsumer returns true for envelope.isEndOfStream (the 
> offset of the envelope is IncomingMessageEnvelope.END_OF_STREAM_OFFSET)
>  # envelope offset is equal to newest offset for that SSP fetched at CSM 
> initialization
> We would like to refactor this flow to, rather than using SystemConsumers 
> directly, leverage RunLoop to take advantage of message dispatching, 
> concurrency, etc.
> Since RunLoop [sinks end of stream messages|#L706], condition 1 can no longer 
> be checked and CSM needs a consistent way of measuring that an SSP's 
> consumption has reached "head" or "end" according to the offset metadata 
> fetched at init time.
> *Possible (partial) reconciliation:*   
>  # Modify InMemorySystemAdmin to return "newestOffset" as the numeric index 
> of the buffered message immediately preceding the end of stream message. This 
> would make the semantics of "newestOffset" match that of HdfsSystemAdmin.
>  ## (-) Semantics of "upcomingOffset" would be unchanged and left 
> inconsistent with HdfsSytemAdmin
>  # Modify RunLoop to either pass end of stream messages through to tasks 
> either directly or by invoking a callback
>  ## (+) EndOfStreamListenerTask today is task-wide, and only called when ALL 
> SSPs in a task reach end of stream. Giving SSP granularity might be welcome 
> flexibility to customers who intermingle bounded / unbounded streams in the 
> same task.
>  ## (-) Uncertain whether customers would actually benefit, and significant 
> API change.
>  # ??
> *Recommendation:*
> Solution 1 above is cheap and minimizes changes to the externally facing 
> samza API, as InMemorySystem is primarily used in the internals of samza's 
> test runner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to