Repository: incubator-beam
Updated Branches:
  refs/heads/master 039d71328 -> e953cb022


[BEAM-220] Fix flaky KafkaIO test


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/539bc4fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/539bc4fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/539bc4fe

Branch: refs/heads/master
Commit: 539bc4fe37575f4600330c53b2e16a754ef26c2c
Parents: 039d713
Author: Raghu Angadi <[email protected]>
Authored: Mon Apr 25 11:43:48 2016 -0700
Committer: Dan Halperin <[email protected]>
Committed: Mon Apr 25 13:46:52 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 32 +++++++++++++++-----
 1 file changed, 25 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/539bc4fe/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 96ffc98..f766d73 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -58,6 +58,7 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -337,6 +338,16 @@ public class KafkaIOTest {
     }
   }
 
+  // Kafka records are read in a separate thread inside the reader. As a 
result advance() might not
+  // read any records even from the mock consumer, especially for the first 
record.
+  // This is a helper method to loop until we read a record.
+  private static void advanceOnce(UnboundedReader<?> reader) throws 
IOException {
+    while (!reader.advance()) {
+      // very rarely will there be more than one attempts.
+      // in case of a bug we might end up looping forever, and test will fail 
with a timeout.
+    }
+  }
+
   @Test
   public void testUnboundedSourceCheckpointMark() throws Exception {
     int numElements = 85; // 85 to make sure some partitions have more records 
than other.
@@ -350,16 +361,17 @@ public class KafkaIOTest {
 
     UnboundedReader<KafkaRecord<byte[], Long>> reader = 
source.createReader(null, null);
     final int numToSkip = 3;
-    // advance once:
-    assertTrue(reader.start());
 
-    // Advance the source numToSkip-1 elements and manually save state.
+    // advance numToSkip elements
+    if (!reader.start()) {
+      advanceOnce(reader);
+    }
+
     for (long l = 0; l < numToSkip - 1; ++l) {
-      assertTrue(reader.advance());
+      advanceOnce(reader);
     }
 
     // Confirm that we get the expected element in sequence before 
checkpointing.
-
     assertEquals(numToSkip - 1, (long) reader.getCurrent().getKV().getValue());
     assertEquals(numToSkip - 1, reader.getCurrentTimestamp().getMillis());
 
@@ -367,14 +379,20 @@ public class KafkaIOTest {
     KafkaCheckpointMark mark = CoderUtils.clone(
         source.getCheckpointMarkCoder(), (KafkaCheckpointMark) 
reader.getCheckpointMark());
     reader = source.createReader(null, mark);
-    assertTrue(reader.start());
 
     // Confirm that we get the next elements in sequence.
     // This also confirms that Reader interleaves records from each partitions 
by the reader.
+
+    if (!reader.start()) {
+      advanceOnce(reader);
+    }
+
     for (int i = numToSkip; i < numElements; i++) {
       assertEquals(i, (long) reader.getCurrent().getKV().getValue());
       assertEquals(i, reader.getCurrentTimestamp().getMillis());
-      reader.advance();
+      if ((i + 1) < numElements) {
+        advanceOnce(reader);
+      }
     }
   }
 }

Reply via email to