ajothomas opened a new pull request, #1639: URL: https://github.com/apache/samza/pull/1639
# Symptoms and Cause: There were a few issues with the processing of drain messages that this PR attempts to fix. The following issues were encountered during the end-to-end testing with test pipelines. 1. It was observed that the `RunLoop` was running into an exception when the tasks were trying to consume from the intermediate stream. This was happening as we stop all consumers when we encounter drain. This also stops the consumers which ingest data from the intermediate streams. After 1 was resolved, some additional issues were observed- 2. The pipeline was getting stuck after partial consumption of data after the last shuffle stage. This was happening as we were removing the SSPs from the processing set of SSPs for a task for high-level API processing. This was leading to unprocessed drain messages and thereby the pipeline would remain stuck. 3. The pipeline was stuck if a drain message was encountered prior to the start of the pipeline. This is happening as we set the `SystemConsumers` to drain mode before it is even started by `SamzaContainer` # Changes: 1. Changed the `SystemConsumers` code to remove the line which stopped registered `SystemConsumer`s on drain. 2. Restrict the logic to remove the SSP from the processing SSP set only to high-level API. Additionally, ask the `TaskCoordinator` to commit the task once all streams for a task have drained. 3. To fix the issue of pipeline getting stuck if a drain control message was present prior to container start, we write drain and watermark control messages on `SystemConsumers` start if it is in drain mode. Additionally, Drain related integration tests (`DrainLowLevelApiIntegrationTest` and `DrainHighLevelApiIntegrationTest`) have been enabled as most of the flakiness and edge-cases have been fixed now. There is a small chance that both the tests can be flaky on rare occasions as `TestRunner.run` and in-memory metadata store writes are happening in separate threads. Despite adding a generous 5 seconds delay for metadata store writes, it is possible for the order to get mixed up which causes the test to fail. Added a test rule `RetryRule` to attempt a retry if the tests fail the first time. # Tests: - End to end tests for both high-level and low-level API - Unit tests for Drain in RunLoop - Integ test for Drain for High-level and Low-Level API- DrainLowLevelApiIntegrationTest & DrainHighLevelApiIntegrationTest # API changes: None -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
