cameronlee314 opened a new pull request #1506:
URL: https://github.com/apache/samza/pull/1506


   Symptom:
   1. End-of-stream and watermarks are not properly propagated through Samza 
when side inputs are used.
   2. This prevents many tests from using the `TestRunner` framework, since the 
`TestRunner` framework relies on having tasks shut themselves down based on 
end-of-stream messages.
   
   Cause:
   OperatorImplGraph builds EndOfStreamStates and WatermarkStates objects with 
all of the input SSPs from the job model. That includes side-input SSPs. 
However, high-level operator tasks aren't given messages from side-input SSPs, 
so high-level operators should not need to include handling for end-of-stream 
and watermarks. The result of this issue is that end-of-stream and watermark 
handling tries to include side-inputs but never updates those states, which can 
result in not exiting properly (end-of-stream) and not correctly calculating 
watermarks.
   Note: We currently have tests which use partitionBy and side-inputs, but 
they only use a single partition, so RunLoop is able to shutdown the task 
(RunLoop doesn't check side inputs when determining if the task is at the end 
of all streams).
   
   Changes:
   1. Pass set of SSPs excluding side-inputs to high-level operators so that 
they don't read directly from the task model which does have side-inputs. 
High-level operators will then handle end-of-stream and watermark propagation 
without considering side-input SSPs.
   2. Change `InMemoryManager` to only use 
`IncomingMessageEnvelope.END_OF_STREAM_OFFSET` when the `taskName` in the 
`EndOfStreamMessage` is null. This prevents the issue with SAMZA-2300 which 
causes end-of-stream messages to not get properly get aggregated and then 
broadcast to all partitions (see SAMZA-2300 for more details). Some existing 
tests would fail without this change.
   3. Add unique `app.id` in `TestRunner` for each test. This helps prevents 
clashes between different tests. For example, `ControlMessageSender` has a 
static cache keyed by stream id of intermediate streams, and multiple tests 
could end up using the same key in that cache. By using a unique app id, the 
intermediate streams are unique, so multiple tests won't use the same key in 
the cache.
   
   Tests: Rewrote some tests to use the `TestRunner` and to use multiple 
partitions. This tests that end-of-stream messages are being propagated for the 
multiple-partition case.
   
   API changes (impacts testing framework only):
   1. The default `app.id` used for tests executed by `TestRunner` is set to 
the "in-memory scope", which is a string that is randomly generated for each 
test. Before this change, the `app.id` was not set.
   2. `InMemoryManager` only uses 
`IncomingMessageEnvelope.END_OF_STREAM_OFFSET` when the `EndOfStreamMessage` 
has a null `taskName`. Before this change, `InMemoryManager` used 
`IncomingMessageEnvelope.END_OF_STREAM_OFFSET` for all `EndOfStreamMessage`s.
   
   Upgrade/usage instructions:
   1. If tests are written using `TestRunner`, and those tests rely on `app.id` 
being unset, then those will need to be updated to use/read the new `app.id`. 
It isn't expected to be a common use case that tests rely on `app.id`.
   2. If the in-memory system is being used (which includes tests written using 
`TestRunner`), and it is expected that the in-memory system sets 
`END_OF_STREAM_OFFSET` for messages when the `taskName` is non-null, then that 
usage will need to be removed. The `taskName` is intended for use by 
intermediate streams, so it shouldn't be used outside of Samza internals 
anyways.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to