[ 
https://issues.apache.org/jira/browse/SAMZA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cameron Lee reassigned SAMZA-2300:
----------------------------------

    Assignee: Cameron Lee

> Incomplete propagation of end-of-stream messages for intermediate stream 
> operators
> ----------------------------------------------------------------------------------
>
>                 Key: SAMZA-2300
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2300
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Cameron Lee
>            Assignee: Cameron Lee
>            Priority: Major
>
> Summary:
> If an intermediate operator (e.g. partitionBy) corresponds to multiple 
> partitions, and the intermediate system returns 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET for end-of-stream messages, then 
> those end-of-stream messages may not get properly propagated to the other 
> partitions.
> More context:
> End-of-stream propagation currently works by aggregating all end-of-stream 
> messages in a single partition and then broadcasting once that single 
> partition gets the expected number of end-of-stream messages. This means a 
> single partition needs to wait for multiple end-of-stream messages. However, 
> in SystemConsumers, if an end-of-stream message is found, then it will mark 
> the stream to no longer be polled for more messages. This means that the 
> aggregate partition may not consume all end-of-stream messages, so it will 
> not broadcast to the other partitions.
> This issue was found while trying to migrate some tests to use in-memory 
> system. The in-memory system explicitly sets the offset to 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET. Existing tests which use 
> in-memory system only have a single partition, so those work. The reason why 
> we probably haven't seen this bug in real use cases is that the kafka system 
> does not set the offset to be IncomingMessageEnvelope.END_OF_STREAM_OFFSET. 
> The tests handle the end-of-stream kafka messages ok, because they get 
> handled by the high-level operators which don't depend on the offset being 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET.
> It is possible that this issue could have been avoided if 
> IncomingMessageEnvelope did not have two different signals for an 
> end-of-stream message (i.e. END_OF_STREAM_OFFSET for "offset" field and 
> EndOfStreamMessage for "message" field).
> Possible fixes:
>  # Change IncomingMessageEnvelope.isEndOfStream to check the message type. We 
> also need to check if the EndOfStreamMessage taskName is non-null, which 
> means that the end-of-stream message came from inside Samza, so we don't want 
> to stop polling in those cases.
>  ## This is a bit more complicated: this changes API semantics for what "end 
> of stream" means, and it still wouldn't fully clarify what 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET refers to (and there are still 
> some usages of IncomingMessageEnvelope.END_OF_STREAM_OFFSET which aren't 
> straightfoward to remove).
>  # Change the in-memory system implementation to only use 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET when the EndOfStreamMessage has 
> a null task name. This would make it more similar to kafka while also keeping 
> the capability to have bounded streams in the case where there are no 
> intermediate streams (e.g. low-level API, side inputs).
>  ## This still leaves confusion regarding usage of 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET vs. EndOfStreamMessage, but it 
> does not change API semantics, so it is a simpler and safer change. Ideally, 
> some refactoring can be done to consolidate 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET and EndOfStreamMessage, but it 
> might not be worth it to do that now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to