Repository: incubator-beam Updated Branches: refs/heads/master 039d71328 -> e953cb022
[BEAM-220] Fix flaky KafkaIO test Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/539bc4fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/539bc4fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/539bc4fe Branch: refs/heads/master Commit: 539bc4fe37575f4600330c53b2e16a754ef26c2c Parents: 039d713 Author: Raghu Angadi <[email protected]> Authored: Mon Apr 25 11:43:48 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon Apr 25 13:46:52 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 32 +++++++++++++++----- 1 file changed, 25 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/539bc4fe/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 96ffc98..f766d73 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -58,6 +58,7 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -337,6 +338,16 @@ public class KafkaIOTest { } } + // Kafka records are read in a separate thread inside the reader. As a result advance() might not + // read any records even from the mock consumer, especially for the first record. + // This is a helper method to loop until we read a record. + private static void advanceOnce(UnboundedReader<?> reader) throws IOException { + while (!reader.advance()) { + // very rarely will there be more than one attempts. + // in case of a bug we might end up looping forever, and test will fail with a timeout. + } + } + @Test public void testUnboundedSourceCheckpointMark() throws Exception { int numElements = 85; // 85 to make sure some partitions have more records than other. @@ -350,16 +361,17 @@ public class KafkaIOTest { UnboundedReader<KafkaRecord<byte[], Long>> reader = source.createReader(null, null); final int numToSkip = 3; - // advance once: - assertTrue(reader.start()); - // Advance the source numToSkip-1 elements and manually save state. + // advance numToSkip elements + if (!reader.start()) { + advanceOnce(reader); + } + for (long l = 0; l < numToSkip - 1; ++l) { - assertTrue(reader.advance()); + advanceOnce(reader); } // Confirm that we get the expected element in sequence before checkpointing. - assertEquals(numToSkip - 1, (long) reader.getCurrent().getKV().getValue()); assertEquals(numToSkip - 1, reader.getCurrentTimestamp().getMillis()); @@ -367,14 +379,20 @@ public class KafkaIOTest { KafkaCheckpointMark mark = CoderUtils.clone( source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark()); reader = source.createReader(null, mark); - assertTrue(reader.start()); // Confirm that we get the next elements in sequence. // This also confirms that Reader interleaves records from each partitions by the reader. + + if (!reader.start()) { + advanceOnce(reader); + } + for (int i = numToSkip; i < numElements; i++) { assertEquals(i, (long) reader.getCurrent().getKV().getValue()); assertEquals(i, reader.getCurrentTimestamp().getMillis()); - reader.advance(); + if ((i + 1) < numElements) { + advanceOnce(reader); + } } } }
