[
https://issues.apache.org/jira/browse/SAMZA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Cameron Lee reassigned SAMZA-2300:
----------------------------------
Assignee: (was: Cameron Lee)
> Incomplete propagation of end-of-stream messages for intermediate stream
> operators
> ----------------------------------------------------------------------------------
>
> Key: SAMZA-2300
> URL: https://issues.apache.org/jira/browse/SAMZA-2300
> Project: Samza
> Issue Type: Bug
> Reporter: Cameron Lee
> Priority: Major
>
> Summary:
> If an intermediate operator (e.g. partitionBy) corresponds to multiple
> partitions, and the intermediate system returns
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET for end-of-stream messages, then
> those end-of-stream messages may not get properly propagated to the other
> partitions.
> More context:
> End-of-stream propagation currently works by aggregating all end-of-stream
> messages in a single partition and then broadcasting once that single
> partition gets the expected number of end-of-stream messages. This means a
> single partition needs to wait for multiple end-of-stream messages. However,
> in SystemConsumers, if an end-of-stream message is found, then it will mark
> the stream to no longer be polled for more messages. This means that the
> aggregate partition may not consume all end-of-stream messages, so it will
> not broadcast to the other partitions.
> This issue was found while trying to migrate some tests to use in-memory
> system. The in-memory system explicitly sets the offset to
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET. Existing tests which use
> in-memory system only have a single partition, so those work. The reason why
> we probably haven't seen this bug in real use cases is that the kafka system
> does not set the offset to be IncomingMessageEnvelope.END_OF_STREAM_OFFSET.
> It is possible that this issue could have been avoided if
> IncomingMessageEnvelope did not have two different signals for an
> end-of-stream message (i.e. END_OF_STREAM_OFFSET for "offset" field and
> EndOfStreamMessage for "message" field).
> Fix:
> To fix this, we can change IncomingMessageEnvelope.isEndOfStream to check the
> message type. We also need to check if the EndOfStreamMessage taskName is
> non-null, which means that the end-of-stream message came from inside Samza,
> so we don't want to stop polling in those cases.
> Also, we should hide the END_OF_STREAM_OFFSET since it should not be needed
> as an external API.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)