Repository: samza
Updated Branches:
  refs/heads/master 445d1e26c -> 94ff28c56


SAMZA-1476: Fix TestStatefulTask flaky test

Unable to reproduce the issue even after constraining CPU/memory resources 
available to the JVM, this fix addresses a potential root cause — a 
`CountDownLatch` shared between 2 threads, main test thread and Kafka producer 
thread, but not marked volatile even though it is reinitialized by the main 
test thread. This could cause the reported issue since each of the 2 threads 
could end up invoking `countDown`/`await` on a different `CountDownLatch` 
object.

It is worthwhile to mention that without this fix, a different test, 
`TestShutdownStatefulTask`, should have also exhibited some flakiness since it 
shares the same `TestTask` base with `TestStatefulTask` and exercises exactly 
the same portion of code that includes the failing assertion. Since no such 
flakiness was reported for `TestShutdownStatefulTask` however, the assumption 
made by this fix is that it might have not been encountered or reported.

Author: Ahmed Abdul Hamid <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>

Closes #498 from ahmedahamid/dev/fix-1476


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/94ff28c5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/94ff28c5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/94ff28c5

Branch: refs/heads/master
Commit: 94ff28c56271902db18d31b5e7b195caa69e311a
Parents: 445d1e2
Author: Ahmed Abdul Hamid <[email protected]>
Authored: Tue May 1 13:00:38 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Tue May 1 13:00:38 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/test/integration/StreamTaskTestUtil.scala     | 2 +-
 .../scala/org/apache/samza/test/integration/TestStatefulTask.scala | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/94ff28c5/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index a381a59..864d2e5 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -323,7 +323,7 @@ object TestTask {
 abstract class TestTask extends StreamTask with InitableTask {
   var received = ArrayBuffer[String]()
   val initFinished = new CountDownLatch(1)
-  var gotMessage = new CountDownLatch(1)
+  @volatile var gotMessage = new CountDownLatch(1)
 
   def init(config: Config, context: TaskContext) {
     TestTask.register(context.getTaskName, this)

http://git-wip-us.apache.org/repos/asf/samza/blob/94ff28c5/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 85251f1..ccd5eaa 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -78,7 +78,7 @@ class TestStatefulTask extends StreamTaskTestUtil {
     // since the second part of the test expects to replay the input streams.
     "systems.kafka.streams.input.samza.reset.offset" -> "true"))
 
-  //@Test
+  @Test
   def testShouldStartAndRestore {
     // Have to do this in one test to guarantee ordering.
     testShouldStartTaskForFirstTime

Reply via email to