Cameron Lee created SAMZA-2300:
----------------------------------

             Summary: Incomplete propagation of end-of-stream messages for 
intermediate stream operators
                 Key: SAMZA-2300
                 URL: https://issues.apache.org/jira/browse/SAMZA-2300
             Project: Samza
          Issue Type: Bug
            Reporter: Cameron Lee
            Assignee: Cameron Lee


If an intermediate operator (e.g. partitionBy) corresponds to multiple 
partitions, and the intermediate system returns 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET for end-of-stream messages, then 
those end-of-stream messages will not get properly propagated to the other 
partitions.

End-of-stream propagation currently works by aggregating all end-of-stream 
messages in a single partition and then broadcasting once that single partition 
gets the expected number of end-of-stream messages. This means a single 
partition needs to wait for multiple end-of-stream messages. However, in 
SystemConsumers, if an end-of-stream message is found, then it will mark the 
stream to no longer be polled for more messages. This means that the aggregate 
partition may not consume all end-of-stream messages, so it will not broadcast 
to the other partitions.

This issue was found while trying to migrate some tests to use in-memory 
system. The in-memory system explicitly sets the offset to 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET. Existing tests which use 
in-memory system only have a single partition, so those work. The reason why we 
probably haven't seen this bug in real use cases is that the kafka system does 
not set the offset to be IncomingMessageEnvelope.END_OF_STREAM_OFFSET.

To fix this, we can change IncomingMessageEnvelope.isEndOfStream to check the 
message type. We also need to check if the EndOfStreamMessage taskName is 
non-null, which means that the end-of-stream message came from inside Samza, so 
we don't want to stop polling in those cases.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to