cameronlee314 opened a new pull request #1506: URL: https://github.com/apache/samza/pull/1506
Symptom: 1. End-of-stream and watermarks are not properly propagated through Samza when side inputs are used. 2. This prevents many tests from using the `TestRunner` framework, since the `TestRunner` framework relies on having tasks shut themselves down based on end-of-stream messages. Cause: OperatorImplGraph builds EndOfStreamStates and WatermarkStates objects with all of the input SSPs from the job model. That includes side-input SSPs. However, high-level operator tasks aren't given messages from side-input SSPs, so high-level operators should not need to include handling for end-of-stream and watermarks. The result of this issue is that end-of-stream and watermark handling tries to include side-inputs but never updates those states, which can result in not exiting properly (end-of-stream) and not correctly calculating watermarks. Note: We currently have tests which use partitionBy and side-inputs, but they only use a single partition, so RunLoop is able to shutdown the task (RunLoop doesn't check side inputs when determining if the task is at the end of all streams). Changes: 1. Pass set of SSPs excluding side-inputs to high-level operators so that they don't read directly from the task model which does have side-inputs. High-level operators will then handle end-of-stream and watermark propagation without considering side-input SSPs. 2. Change `InMemoryManager` to only use `IncomingMessageEnvelope.END_OF_STREAM_OFFSET` when the `taskName` in the `EndOfStreamMessage` is null. This prevents the issue with SAMZA-2300 which causes end-of-stream messages to not get properly get aggregated and then broadcast to all partitions (see SAMZA-2300 for more details). Some existing tests would fail without this change. 3. Add unique `app.id` in `TestRunner` for each test. This helps prevents clashes between different tests. For example, `ControlMessageSender` has a static cache keyed by stream id of intermediate streams, and multiple tests could end up using the same key in that cache. By using a unique app id, the intermediate streams are unique, so multiple tests won't use the same key in the cache. Tests: Rewrote some tests to use the `TestRunner` and to use multiple partitions. This tests that end-of-stream messages are being propagated for the multiple-partition case. API changes (impacts testing framework only): 1. The default `app.id` used for tests executed by `TestRunner` is set to the "in-memory scope", which is a string that is randomly generated for each test. Before this change, the `app.id` was not set. 2. `InMemoryManager` only uses `IncomingMessageEnvelope.END_OF_STREAM_OFFSET` when the `EndOfStreamMessage` has a null `taskName`. Before this change, `InMemoryManager` used `IncomingMessageEnvelope.END_OF_STREAM_OFFSET` for all `EndOfStreamMessage`s. Upgrade/usage instructions: 1. If tests are written using `TestRunner`, and those tests rely on `app.id` being unset, then those will need to be updated to use/read the new `app.id`. It isn't expected to be a common use case that tests rely on `app.id`. 2. If the in-memory system is being used (which includes tests written using `TestRunner`), and it is expected that the in-memory system sets `END_OF_STREAM_OFFSET` for messages when the `taskName` is non-null, then that usage will need to be removed. The `taskName` is intended for use by intermediate streams, so it shouldn't be used outside of Samza internals anyways. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
