[
https://issues.apache.org/jira/browse/SAMZA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Cameron Lee reassigned SAMZA-2300:
----------------------------------
Assignee: Cameron Lee
> Incomplete propagation of end-of-stream messages for intermediate stream
> operators
> ----------------------------------------------------------------------------------
>
> Key: SAMZA-2300
> URL: https://issues.apache.org/jira/browse/SAMZA-2300
> Project: Samza
> Issue Type: Bug
> Reporter: Cameron Lee
> Assignee: Cameron Lee
> Priority: Major
>
> Summary:
> If an intermediate operator (e.g. partitionBy) corresponds to multiple
> partitions, and the intermediate system returns
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET for end-of-stream messages, then
> those end-of-stream messages may not get properly propagated to the other
> partitions.
> More context:
> End-of-stream propagation currently works by aggregating all end-of-stream
> messages in a single partition and then broadcasting once that single
> partition gets the expected number of end-of-stream messages. This means a
> single partition needs to wait for multiple end-of-stream messages. However,
> in SystemConsumers, if an end-of-stream message is found, then it will mark
> the stream to no longer be polled for more messages. This means that the
> aggregate partition may not consume all end-of-stream messages, so it will
> not broadcast to the other partitions.
> This issue was found while trying to migrate some tests to use in-memory
> system. The in-memory system explicitly sets the offset to
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET. Existing tests which use
> in-memory system only have a single partition, so those work. The reason why
> we probably haven't seen this bug in real use cases is that the kafka system
> does not set the offset to be IncomingMessageEnvelope.END_OF_STREAM_OFFSET.
> The tests handle the end-of-stream kafka messages ok, because they get
> handled by the high-level operators which don't depend on the offset being
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET.
> It is possible that this issue could have been avoided if
> IncomingMessageEnvelope did not have two different signals for an
> end-of-stream message (i.e. END_OF_STREAM_OFFSET for "offset" field and
> EndOfStreamMessage for "message" field).
> Possible fixes:
> # Change IncomingMessageEnvelope.isEndOfStream to check the message type. We
> also need to check if the EndOfStreamMessage taskName is non-null, which
> means that the end-of-stream message came from inside Samza, so we don't want
> to stop polling in those cases.
> ## This is a bit more complicated: this changes API semantics for what "end
> of stream" means, and it still wouldn't fully clarify what
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET refers to (and there are still
> some usages of IncomingMessageEnvelope.END_OF_STREAM_OFFSET which aren't
> straightfoward to remove).
> # Change the in-memory system implementation to only use
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET when the EndOfStreamMessage has
> a null task name. This would make it more similar to kafka while also keeping
> the capability to have bounded streams in the case where there are no
> intermediate streams (e.g. low-level API, side inputs).
> ## This still leaves confusion regarding usage of
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET vs. EndOfStreamMessage, but it
> does not change API semantics, so it is a simpler and safer change. Ideally,
> some refactoring can be done to consolidate
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET and EndOfStreamMessage, but it
> might not be worth it to do that now.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)