[
https://issues.apache.org/jira/browse/SAMZA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Cameron Lee updated SAMZA-2300:
-------------------------------
Description:
Summary:
If an intermediate operator (e.g. partitionBy) corresponds to multiple
partitions, and the intermediate system returns
IncomingMessageEnvelope.END_OF_STREAM_OFFSET for end-of-stream messages, then
those end-of-stream messages may not get properly propagated to the other
partitions.
More context:
End-of-stream propagation currently works by aggregating all end-of-stream
messages in a single partition and then broadcasting once that single partition
gets the expected number of end-of-stream messages. This means a single
partition needs to wait for multiple end-of-stream messages. However, in
SystemConsumers, if an end-of-stream message is found, then it will mark the
stream to no longer be polled for more messages. This means that the aggregate
partition may not consume all end-of-stream messages, so it will not broadcast
to the other partitions.
This issue was found while trying to migrate some tests to use in-memory
system. The in-memory system explicitly sets the offset to
IncomingMessageEnvelope.END_OF_STREAM_OFFSET. Existing tests which use
in-memory system only have a single partition, so those work. The reason why we
probably haven't seen this bug in real use cases is that the kafka system does
not set the offset to be IncomingMessageEnvelope.END_OF_STREAM_OFFSET.
It is possible that this issue could have been avoided if
IncomingMessageEnvelope did not have two different signals for an end-of-stream
message (i.e. END_OF_STREAM_OFFSET for "offset" field and EndOfStreamMessage
for "message" field).
Fix:
To fix this, we can change IncomingMessageEnvelope.isEndOfStream to check the
message type. We also need to check if the EndOfStreamMessage taskName is
non-null, which means that the end-of-stream message came from inside Samza, so
we don't want to stop polling in those cases.
Also, we should hide the END_OF_STREAM_OFFSET since it should not be needed as
an external API.
was:
Summary:
If an intermediate operator (e.g. partitionBy) corresponds to multiple
partitions, and the intermediate system returns
IncomingMessageEnvelope.END_OF_STREAM_OFFSET for end-of-stream messages, then
those end-of-stream messages will not get properly propagated to the other
partitions.
More context:
End-of-stream propagation currently works by aggregating all end-of-stream
messages in a single partition and then broadcasting once that single partition
gets the expected number of end-of-stream messages. This means a single
partition needs to wait for multiple end-of-stream messages. However, in
SystemConsumers, if an end-of-stream message is found, then it will mark the
stream to no longer be polled for more messages. This means that the aggregate
partition may not consume all end-of-stream messages, so it will not broadcast
to the other partitions.
This issue was found while trying to migrate some tests to use in-memory
system. The in-memory system explicitly sets the offset to
IncomingMessageEnvelope.END_OF_STREAM_OFFSET. Existing tests which use
in-memory system only have a single partition, so those work. The reason why we
probably haven't seen this bug in real use cases is that the kafka system does
not set the offset to be IncomingMessageEnvelope.END_OF_STREAM_OFFSET.
It is possible that this issue could have been avoided if
IncomingMessageEnvelope did not have two different signals for an end-of-stream
message (i.e. END_OF_STREAM_OFFSET for "offset" field and EndOfStreamMessage
for "message" field).
Fix:
To fix this, we can change IncomingMessageEnvelope.isEndOfStream to check the
message type. We also need to check if the EndOfStreamMessage taskName is
non-null, which means that the end-of-stream message came from inside Samza, so
we don't want to stop polling in those cases.
Also, we should hide the END_OF_STREAM_OFFSET since it should not be needed as
an external API.
> Incomplete propagation of end-of-stream messages for intermediate stream
> operators
> ----------------------------------------------------------------------------------
>
> Key: SAMZA-2300
> URL: https://issues.apache.org/jira/browse/SAMZA-2300
> Project: Samza
> Issue Type: Bug
> Reporter: Cameron Lee
> Assignee: Cameron Lee
> Priority: Major
>
> Summary:
> If an intermediate operator (e.g. partitionBy) corresponds to multiple
> partitions, and the intermediate system returns
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET for end-of-stream messages, then
> those end-of-stream messages may not get properly propagated to the other
> partitions.
> More context:
> End-of-stream propagation currently works by aggregating all end-of-stream
> messages in a single partition and then broadcasting once that single
> partition gets the expected number of end-of-stream messages. This means a
> single partition needs to wait for multiple end-of-stream messages. However,
> in SystemConsumers, if an end-of-stream message is found, then it will mark
> the stream to no longer be polled for more messages. This means that the
> aggregate partition may not consume all end-of-stream messages, so it will
> not broadcast to the other partitions.
> This issue was found while trying to migrate some tests to use in-memory
> system. The in-memory system explicitly sets the offset to
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET. Existing tests which use
> in-memory system only have a single partition, so those work. The reason why
> we probably haven't seen this bug in real use cases is that the kafka system
> does not set the offset to be IncomingMessageEnvelope.END_OF_STREAM_OFFSET.
> It is possible that this issue could have been avoided if
> IncomingMessageEnvelope did not have two different signals for an
> end-of-stream message (i.e. END_OF_STREAM_OFFSET for "offset" field and
> EndOfStreamMessage for "message" field).
> Fix:
> To fix this, we can change IncomingMessageEnvelope.isEndOfStream to check the
> message type. We also need to check if the EndOfStreamMessage taskName is
> non-null, which means that the end-of-stream message came from inside Samza,
> so we don't want to stop polling in those cases.
> Also, we should hide the END_OF_STREAM_OFFSET since it should not be needed
> as an external API.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)