[ 
https://issues.apache.org/jira/browse/SAMZA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cameron Lee updated SAMZA-2300:
-------------------------------
    Description: 
Summary:

If an intermediate operator (e.g. partitionBy) corresponds to multiple 
partitions, and the intermediate system returns 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET for end-of-stream messages, then 
those end-of-stream messages may not get properly propagated to the other 
partitions.

More context:

End-of-stream propagation currently works by aggregating all end-of-stream 
messages in a single partition and then broadcasting once that single partition 
gets the expected number of end-of-stream messages. This means a single 
partition needs to wait for multiple end-of-stream messages. However, in 
SystemConsumers, if an end-of-stream message is found, then it will mark the 
stream to no longer be polled for more messages. This means that the aggregate 
partition may not consume all end-of-stream messages, so it will not broadcast 
to the other partitions.

This issue was found while trying to migrate some tests to use in-memory 
system. The in-memory system explicitly sets the offset to 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET. Existing tests which use 
in-memory system only have a single partition, so those work. The reason why we 
probably haven't seen this bug in real use cases is that the kafka system does 
not set the offset to be IncomingMessageEnvelope.END_OF_STREAM_OFFSET. The 
tests handle the end-of-stream kafka messages ok, because they get handled by 
the high-level operators which don't depend on the offset being 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET.

It is possible that this issue could have been avoided if 
IncomingMessageEnvelope did not have two different signals for an end-of-stream 
message (i.e. END_OF_STREAM_OFFSET for "offset" field and EndOfStreamMessage 
for "message" field).

Possible fixes:
 # Change IncomingMessageEnvelope.isEndOfStream to check the message type. We 
also need to check if the EndOfStreamMessage taskName is non-null, which means 
that the end-of-stream message came from inside Samza, so we don't want to stop 
polling in those cases.
 ## This is a bit more complicated: this changes API semantics for what "end of 
stream" means, and it still wouldn't fully clarify what 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET refers to (and there are still 
some usages of IncomingMessageEnvelope.END_OF_STREAM_OFFSET which aren't 
straightfoward to remove).
 # Change the in-memory system implementation to only use 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET when the EndOfStreamMessage has a 
null task name. This would make it more similar to kafka while also keeping the 
capability to have bounded streams in the case where there are no intermediate 
streams (e.g. low-level API, side inputs).
 ## This still leaves confusion regarding usage of 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET vs. EndOfStreamMessage, but it 
does not change API semantics, so it is a simpler and safer change. Ideally, 
some refactoring can be done to consolidate these two parts, but it might not 
be worth it to do that now.

  was:
Summary:

If an intermediate operator (e.g. partitionBy) corresponds to multiple 
partitions, and the intermediate system returns 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET for end-of-stream messages, then 
those end-of-stream messages may not get properly propagated to the other 
partitions.

More context:

End-of-stream propagation currently works by aggregating all end-of-stream 
messages in a single partition and then broadcasting once that single partition 
gets the expected number of end-of-stream messages. This means a single 
partition needs to wait for multiple end-of-stream messages. However, in 
SystemConsumers, if an end-of-stream message is found, then it will mark the 
stream to no longer be polled for more messages. This means that the aggregate 
partition may not consume all end-of-stream messages, so it will not broadcast 
to the other partitions.

This issue was found while trying to migrate some tests to use in-memory 
system. The in-memory system explicitly sets the offset to 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET. Existing tests which use 
in-memory system only have a single partition, so those work. The reason why we 
probably haven't seen this bug in real use cases is that the kafka system does 
not set the offset to be IncomingMessageEnvelope.END_OF_STREAM_OFFSET.

It is possible that this issue could have been avoided if 
IncomingMessageEnvelope did not have two different signals for an end-of-stream 
message (i.e. END_OF_STREAM_OFFSET for "offset" field and EndOfStreamMessage 
for "message" field).

Possible fixes:
 # Change IncomingMessageEnvelope.isEndOfStream to check the message type. We 
also need to check if the EndOfStreamMessage taskName is non-null, which means 
that the end-of-stream message came from inside Samza, so we don't want to stop 
polling in those cases.
 ## This is a bit more complicated: this changes API semantics for what "end of 
stream" means, and it still wouldn't fully clarify what 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET refers to (and there are still 
some usages of IncomingMessageEnvelope.END_OF_STREAM_OFFSET which aren't 
straightfoward to remove).
 # Change the in-memory system implementation to only use 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET when the EndOfStreamMessage has a 
null task name. This would make it more similar to kafka while also keeping the 
capability to have bounded streams in the case where there are no intermediate 
streams (e.g. low-level API, side inputs).
 ## This still leaves confusion regarding usage of 
IncomingMessageEnvelope.END_OF_STREAM_OFFSET vs. EndOfStreamMessage, but it 
does not change API semantics, so it is a simpler and safer change. Ideally, 
some refactoring can be done to consolidate these two parts, but it might not 
be worth it to do that now.


> Incomplete propagation of end-of-stream messages for intermediate stream 
> operators
> ----------------------------------------------------------------------------------
>
>                 Key: SAMZA-2300
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2300
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Cameron Lee
>            Priority: Major
>
> Summary:
> If an intermediate operator (e.g. partitionBy) corresponds to multiple 
> partitions, and the intermediate system returns 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET for end-of-stream messages, then 
> those end-of-stream messages may not get properly propagated to the other 
> partitions.
> More context:
> End-of-stream propagation currently works by aggregating all end-of-stream 
> messages in a single partition and then broadcasting once that single 
> partition gets the expected number of end-of-stream messages. This means a 
> single partition needs to wait for multiple end-of-stream messages. However, 
> in SystemConsumers, if an end-of-stream message is found, then it will mark 
> the stream to no longer be polled for more messages. This means that the 
> aggregate partition may not consume all end-of-stream messages, so it will 
> not broadcast to the other partitions.
> This issue was found while trying to migrate some tests to use in-memory 
> system. The in-memory system explicitly sets the offset to 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET. Existing tests which use 
> in-memory system only have a single partition, so those work. The reason why 
> we probably haven't seen this bug in real use cases is that the kafka system 
> does not set the offset to be IncomingMessageEnvelope.END_OF_STREAM_OFFSET. 
> The tests handle the end-of-stream kafka messages ok, because they get 
> handled by the high-level operators which don't depend on the offset being 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET.
> It is possible that this issue could have been avoided if 
> IncomingMessageEnvelope did not have two different signals for an 
> end-of-stream message (i.e. END_OF_STREAM_OFFSET for "offset" field and 
> EndOfStreamMessage for "message" field).
> Possible fixes:
>  # Change IncomingMessageEnvelope.isEndOfStream to check the message type. We 
> also need to check if the EndOfStreamMessage taskName is non-null, which 
> means that the end-of-stream message came from inside Samza, so we don't want 
> to stop polling in those cases.
>  ## This is a bit more complicated: this changes API semantics for what "end 
> of stream" means, and it still wouldn't fully clarify what 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET refers to (and there are still 
> some usages of IncomingMessageEnvelope.END_OF_STREAM_OFFSET which aren't 
> straightfoward to remove).
>  # Change the in-memory system implementation to only use 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET when the EndOfStreamMessage has 
> a null task name. This would make it more similar to kafka while also keeping 
> the capability to have bounded streams in the case where there are no 
> intermediate streams (e.g. low-level API, side inputs).
>  ## This still leaves confusion regarding usage of 
> IncomingMessageEnvelope.END_OF_STREAM_OFFSET vs. EndOfStreamMessage, but it 
> does not change API semantics, so it is a simpler and safer change. Ideally, 
> some refactoring can be done to consolidate these two parts, but it might not 
> be worth it to do that now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to