[ 
https://issues.apache.org/jira/browse/SAMZA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cameron Lee reassigned SAMZA-2300:
----------------------------------

    Assignee:     (was: Cameron Lee)

> Incomplete propagation of end-of-stream messages for intermediate stream 
> operators
> ----------------------------------------------------------------------------------
>
>                 Key: SAMZA-2300
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2300
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Cameron Lee
>            Priority: Major
>
> Summary:
> If an intermediate operator (e.g. partitionBy) corresponds to multiple 
> partitions, and the intermediate system returns 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET for end-of-stream messages, then 
> those end-of-stream messages may not get properly propagated to the other 
> partitions.
> More context:
> End-of-stream propagation currently works by aggregating all end-of-stream 
> messages in a single partition and then broadcasting once that single 
> partition gets the expected number of end-of-stream messages. This means a 
> single partition needs to wait for multiple end-of-stream messages. However, 
> in SystemConsumers, if an end-of-stream message is found, then it will mark 
> the stream to no longer be polled for more messages. This means that the 
> aggregate partition may not consume all end-of-stream messages, so it will 
> not broadcast to the other partitions.
> This issue was found while trying to migrate some tests to use in-memory 
> system. The in-memory system explicitly sets the offset to 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET. Existing tests which use 
> in-memory system only have a single partition, so those work. The reason why 
> we probably haven't seen this bug in real use cases is that the kafka system 
> does not set the offset to be IncomingMessageEnvelope.END_OF_STREAM_OFFSET.
> It is possible that this issue could have been avoided if 
> IncomingMessageEnvelope did not have two different signals for an 
> end-of-stream message (i.e. END_OF_STREAM_OFFSET for "offset" field and 
> EndOfStreamMessage for "message" field).
> Fix:
> To fix this, we can change IncomingMessageEnvelope.isEndOfStream to check the 
> message type. We also need to check if the EndOfStreamMessage taskName is 
> non-null, which means that the end-of-stream message came from inside Samza, 
> so we don't want to stop polling in those cases.
> Also, we should hide the END_OF_STREAM_OFFSET since it should not be needed 
> as an external API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to