Repository: samza Updated Branches: refs/heads/master 445d1e26c -> 94ff28c56
SAMZA-1476: Fix TestStatefulTask flaky test Unable to reproduce the issue even after constraining CPU/memory resources available to the JVM, this fix addresses a potential root cause â a `CountDownLatch` shared between 2 threads, main test thread and Kafka producer thread, but not marked volatile even though it is reinitialized by the main test thread. This could cause the reported issue since each of the 2 threads could end up invoking `countDown`/`await` on a different `CountDownLatch` object. It is worthwhile to mention that without this fix, a different test, `TestShutdownStatefulTask`, should have also exhibited some flakiness since it shares the same `TestTask` base with `TestStatefulTask` and exercises exactly the same portion of code that includes the failing assertion. Since no such flakiness was reported for `TestShutdownStatefulTask` however, the assumption made by this fix is that it might have not been encountered or reported. Author: Ahmed Abdul Hamid <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #498 from ahmedahamid/dev/fix-1476 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/94ff28c5 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/94ff28c5 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/94ff28c5 Branch: refs/heads/master Commit: 94ff28c56271902db18d31b5e7b195caa69e311a Parents: 445d1e2 Author: Ahmed Abdul Hamid <[email protected]> Authored: Tue May 1 13:00:38 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Tue May 1 13:00:38 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/test/integration/StreamTaskTestUtil.scala | 2 +- .../scala/org/apache/samza/test/integration/TestStatefulTask.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/94ff28c5/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala index a381a59..864d2e5 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala @@ -323,7 +323,7 @@ object TestTask { abstract class TestTask extends StreamTask with InitableTask { var received = ArrayBuffer[String]() val initFinished = new CountDownLatch(1) - var gotMessage = new CountDownLatch(1) + @volatile var gotMessage = new CountDownLatch(1) def init(config: Config, context: TaskContext) { TestTask.register(context.getTaskName, this) http://git-wip-us.apache.org/repos/asf/samza/blob/94ff28c5/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index 85251f1..ccd5eaa 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -78,7 +78,7 @@ class TestStatefulTask extends StreamTaskTestUtil { // since the second part of the test expects to replay the input streams. "systems.kafka.streams.input.samza.reset.offset" -> "true")) - //@Test + @Test def testShouldStartAndRestore { // Have to do this in one test to guarantee ordering. testShouldStartTaskForFirstTime
