Repository: flink Updated Branches: refs/heads/master 6c310a762 -> 7477c5b57
http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index fa93138..38a3ce8 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -17,14 +17,11 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; @@ -32,21 +29,14 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.SerializedValue; import org.junit.Assert; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import java.io.Serializable; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyMapOf; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; /** @@ -58,55 +48,13 @@ import static org.mockito.Mockito.mock; */ public class FlinkKafkaConsumerBaseMigrationTest { - private static String getResourceFilename(String filename) { - ClassLoader cl = FlinkKafkaConsumerBaseMigrationTest.class.getClassLoader(); - URL resource = cl.getResource(filename); - if (resource == null) { - throw new NullPointerException("Missing snapshot resource."); - } - return resource.getFile(); - } - + /** Test restoring from an legacy empty state, when no partitions could be found for topics. */ @Test public void testRestoreFromFlink11WithEmptyStateNoPartitions() throws Exception { - // -------------------------------------------------------------------- - // prepare fake states - // -------------------------------------------------------------------- + final DummyFlinkKafkaConsumer<String> consumerFunction = + new DummyFlinkKafkaConsumer<>(Collections.<KafkaTopicPartition>emptyList()); - final OneShotLatch latch = new OneShotLatch(); - final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - latch.trigger(); - Assert.fail("This should never be called"); - return null; - } - }).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class)); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - latch.trigger(); - Assert.fail("This should never be called"); - return null; - } - }).when(fetcher).runFetchLoop(); - - final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>( - new FetcherFactory<String>() { - private static final long serialVersionUID = -2803131905656983619L; - - @Override - public AbstractFetcher<String, ?> createFetcher() { - return fetcher; - } - }, - Collections.<KafkaTopicPartition>emptyList()); - - StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = - new StreamSource<>(consumerFunction); + StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction); final AbstractStreamOperatorTestHarness<String> testHarness = new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); @@ -114,88 +62,30 @@ public class FlinkKafkaConsumerBaseMigrationTest { testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); testHarness.setup(); + // restore state from binary snapshot file using legacy method testHarness.initializeStateFromLegacyCheckpoint( getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state")); testHarness.open(); - final Throwable[] error = new Throwable[1]; - - // run the source asynchronously - Thread runner = new Thread() { - @Override - public void run() { - try { - consumerFunction.run(new DummySourceContext() { - @Override - public void collect(String element) { - latch.trigger(); - Assert.fail("This should never be called."); - } - - @Override - public void emitWatermark(Watermark mark) { - latch.trigger(); - assertEquals(Long.MAX_VALUE, mark.getTimestamp()); - } - }); - } - catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - } - } - }; - runner.start(); + // assert that no partitions were found and is empty + Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null); + Assert.assertTrue(consumerFunction.getSubscribedPartitions().isEmpty()); - if (!latch.isTriggered()) { - latch.await(); - } + // assert that no state was restored + Assert.assertTrue(consumerFunction.getRestoredState() == null); consumerOperator.close(); - consumerOperator.cancel(); - runner.interrupt(); - runner.join(); - - Assert.assertNull(error[0]); } + /** Test restoring from an empty state taken using Flink 1.1, when some partitions could be found for topics. */ @Test public void testRestoreFromFlink11WithEmptyStateWithPartitions() throws Exception { - final OneShotLatch latch = new OneShotLatch(); - final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - latch.trigger(); - Assert.fail("This should never be called"); - return null; - } - }).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class)); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - latch.trigger(); - return null; - } - }).when(fetcher).runFetchLoop(); - final List<KafkaTopicPartition> partitions = new ArrayList<>(); partitions.add(new KafkaTopicPartition("abc", 13)); partitions.add(new KafkaTopicPartition("def", 7)); - final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>( - new FetcherFactory<String>() { - private static final long serialVersionUID = -2803131905656983619L; - - @Override - public AbstractFetcher<String, ?> createFetcher() { - return fetcher; - } - }, - partitions); + final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(partitions); StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction); @@ -206,89 +96,31 @@ public class FlinkKafkaConsumerBaseMigrationTest { testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); testHarness.setup(); + // restore state from binary snapshot file using legacy method testHarness.initializeStateFromLegacyCheckpoint( getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state")); testHarness.open(); - final Throwable[] error = new Throwable[1]; - - // run the source asynchronously - Thread runner = new Thread() { - @Override - public void run() { - try { - consumerFunction.run(new DummySourceContext() { - @Override - public void collect(String element) { - latch.trigger(); - Assert.fail("This should never be called."); - } - - @Override - public void emitWatermark(Watermark mark) { - latch.trigger(); - Assert.fail("This should never be called."); - } - }); - } - catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - } - } - }; - runner.start(); + // assert that there are partitions and is identical to expected list + Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null); + Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty()); + Assert.assertTrue(consumerFunction.getSubscribedPartitions().equals(partitions)); - if (!latch.isTriggered()) { - latch.await(); - } + // assert that no state was restored + Assert.assertTrue(consumerFunction.getRestoredState() == null); consumerOperator.close(); - - runner.join(); - - Assert.assertNull(error[0]); + consumerOperator.cancel(); } + /** Test restoring from a non-empty state taken using Flink 1.1, when some partitions could be found for topics. */ @Test public void testRestoreFromFlink11() throws Exception { - // -------------------------------------------------------------------- - // prepare fake states - // -------------------------------------------------------------------- - - final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>(); - state1.put(new KafkaTopicPartition("abc", 13), 16768L); - state1.put(new KafkaTopicPartition("def", 7), 987654321L); - - final OneShotLatch latch = new OneShotLatch(); - final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - Map<KafkaTopicPartition, Long> map = (HashMap<KafkaTopicPartition, Long>) invocationOnMock.getArguments()[0]; - - latch.trigger(); - assertEquals(state1, map); - return null; - } - }).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class)); - - final List<KafkaTopicPartition> partitions = new ArrayList<>(); partitions.add(new KafkaTopicPartition("abc", 13)); partitions.add(new KafkaTopicPartition("def", 7)); - final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>( - new FetcherFactory<String>() { - private static final long serialVersionUID = -2803131905656983619L; - - @Override - public AbstractFetcher<String, ?> createFetcher() { - return fetcher; - } - }, - partitions); + final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(partitions); StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction); @@ -299,91 +131,60 @@ public class FlinkKafkaConsumerBaseMigrationTest { testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); testHarness.setup(); + // restore state from binary snapshot file using legacy method testHarness.initializeStateFromLegacyCheckpoint( getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot")); testHarness.open(); - final Throwable[] error = new Throwable[1]; + // assert that there are partitions and is identical to expected list + Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null); + Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty()); + Assert.assertEquals(partitions, consumerFunction.getSubscribedPartitions()); - // run the source asynchronously - Thread runner = new Thread() { - @Override - public void run() { - try { - consumerFunction.run(new DummySourceContext() { - @Override - public void collect(String element) { - //latch.trigger(); - } - }); - } - catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - } - } - }; - runner.start(); + // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot" + final HashMap<KafkaTopicPartition, Long> expectedState = new HashMap<>(); + expectedState.put(new KafkaTopicPartition("abc", 13), 16768L); + expectedState.put(new KafkaTopicPartition("def", 7), 987654321L); - if (!latch.isTriggered()) { - latch.await(); - } + // assert that state is correctly restored from legacy checkpoint + Assert.assertTrue(consumerFunction.getRestoredState() != null); + Assert.assertEquals(expectedState, consumerFunction.getRestoredState()); consumerOperator.close(); - - runner.join(); - - Assert.assertNull(error[0]); - } - - private abstract static class DummySourceContext - implements SourceFunction.SourceContext<String> { - - private final Object lock = new Object(); - - @Override - public void collectWithTimestamp(String element, long timestamp) { - } - - @Override - public void emitWatermark(Watermark mark) { - } - - @Override - public Object getCheckpointLock() { - return lock; - } - - @Override - public void close() { - } + consumerOperator.cancel(); } // ------------------------------------------------------------------------ - private interface FetcherFactory<T> extends Serializable { - AbstractFetcher<T, ?> createFetcher(); + private static String getResourceFilename(String filename) { + ClassLoader cl = FlinkKafkaConsumerBaseMigrationTest.class.getClassLoader(); + URL resource = cl.getResource(filename); + if (resource == null) { + throw new NullPointerException("Missing snapshot resource."); + } + return resource.getFile(); } private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> { private static final long serialVersionUID = 1L; - private final FetcherFactory<T> fetcherFactory; - private final List<KafkaTopicPartition> partitions; @SuppressWarnings("unchecked") - DummyFlinkKafkaConsumer( - FetcherFactory<T> fetcherFactory, - List<KafkaTopicPartition> partitions) { + DummyFlinkKafkaConsumer(List<KafkaTopicPartition> partitions) { super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class)); - this.fetcherFactory = fetcherFactory; this.partitions = partitions; } @Override - protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { - return fetcherFactory.createFetcher(); + protected AbstractFetcher<T, ?> createFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> thisSubtaskPartitions, + HashMap<KafkaTopicPartition, Long> restoredSnapshotState, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception { + return mock(AbstractFetcher.class); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index b96ba30..980a025 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -361,16 +361,19 @@ public class FlinkKafkaConsumerBaseTest { } @Override - protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { - AbstractFetcher<T, ?> fetcher = mock(AbstractFetcher.class); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - Assert.fail("Trying to restore offsets even though there was no restore state."); - return null; - } - }).when(fetcher).restoreOffsets(any(HashMap.class)); - return fetcher; + @SuppressWarnings("unchecked") + protected AbstractFetcher<T, ?> createFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> thisSubtaskPartitions, + HashMap<KafkaTopicPartition, Long> restoredSnapshotState, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception { + if (restoredSnapshotState != null) { + Assert.fail("Trying to restore offsets even though there was no restore state."); + return null; + } + return mock(AbstractFetcher.class); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index d7fab88..cb8b0d0 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -61,6 +61,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper; import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; @@ -234,7 +235,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { final Long l50 = 50L; // the final committed offset in Kafka should be 50 final long deadline = 30000 + System.currentTimeMillis(); - KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); do { Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); @@ -281,7 +282,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1); - KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); Long o1; Long o2; @@ -348,7 +349,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { (o3 != null) ? o3.intValue() : 0 )); - readSequence(env2, standardProps, topicName, partitionsToValuesCountAndStartOffset); + readSequence(env2, StartupMode.GROUP_OFFSETS, standardProps, topicName, partitionsToValuesCountAndStartOffset); kafkaOffsetHandler.close(); deleteTestTopic(topicName); @@ -402,7 +403,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { }; runner.start(); - KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); final Long l50 = 50L; // the final committed offset in Kafka should be 50 final long deadline = 30000 + System.currentTimeMillis(); @@ -438,6 +439,217 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** + * This test ensures that when explicitly set to start from earliest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test ensures that when explicitly set to start from latest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromLatestOffsets() throws Exception { + // 50 records written to each of 3 partitions before launching a latest-starting consuming job + final int parallelism = 3; + final int recordsInEachPartition = 50; + + // each partition will be written an extra 200 records + final int extraRecordsInEachPartition = 200; + + // all already existing data in the topic, before the consuming topology has started, should be ignored + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + // job names for the topologies for writing and consuming the extra records + final String consumeExtraRecordsJobName = "Consume Extra Records Job"; + final String writeExtraRecordsJobName = "Write Extra Records Job"; + + // seriliazation / deserialization schemas for writing and consuming the extra records + final TypeInformation<Tuple2<Integer, Integer>> resultType = + TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}); + + final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema = + new KeyedSerializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); + + final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema = + new KeyedDeserializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); + + // setup and run the latest-consuming job + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> latestReadingConsumer = + kafkaServer.getConsumer(topicName, deserSchema, readProps); + latestReadingConsumer.setStartFromLatest(); + + env + .addSource(latestReadingConsumer).setParallelism(parallelism) + .flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Object>() { + @Override + public void flatMap(Tuple2<Integer, Integer> value, Collector<Object> out) throws Exception { + if (value.f1 - recordsInEachPartition < 0) { + throw new RuntimeException("test failed; consumed a record that was previously written: " + value); + } + } + }).setParallelism(1) + .addSink(new DiscardingSink<>()); + + final AtomicReference<Throwable> error = new AtomicReference<>(); + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + env.execute(consumeExtraRecordsJobName); + } catch (Throwable t) { + if (!(t.getCause() instanceof JobCancellationException)) { + error.set(t); + } + } + } + }); + consumeThread.start(); + + // wait until the consuming job has started, to be extra safe + JobManagerCommunicationUtils.waitUntilJobIsRunning( + flink.getLeaderGateway(timeout), + consumeExtraRecordsJobName); + + // setup the extra records writing job + final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + + DataStream<Tuple2<Integer, Integer>> extraRecordsStream = env2 + .addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { + + private boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { + int count = recordsInEachPartition; // the extra records should start from the last written value + int partition = getRuntimeContext().getIndexOfThisSubtask(); + + while (running && count < recordsInEachPartition + extraRecordsInEachPartition) { + ctx.collect(new Tuple2<>(partition, count)); + count++; + } + } + + @Override + public void cancel() { + running = false; + } + }).setParallelism(parallelism); + + kafkaServer.produceIntoKafka(extraRecordsStream, topicName, serSchema, readProps, null); + + try { + env2.execute(writeExtraRecordsJobName); + } + catch (Exception e) { + throw new RuntimeException("Writing extra records failed", e); + } + + // cancel the consume job after all extra records are written + JobManagerCommunicationUtils.cancelCurrentJob( + flink.getLeaderGateway(timeout), + consumeExtraRecordsJobName); + consumeThread.join(); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + + // check whether the consuming thread threw any test errors; + // test will fail here if the consume job had incorrectly read any records other than the extra records + final Throwable consumerError = error.get(); + if (consumerError != null) { + throw new Exception("Exception in the consuming thread", consumerError); + } + } + + /** + * This test ensures that the consumer correctly uses group offsets in Kafka, and defaults to "auto.offset.reset" + * behaviour when necessary, when explicitly configured to start from group offsets. + * + * The partitions and their committed group offsets are setup as: + * partition 0 --> committed offset 23 + * partition 1 --> no commit offset + * partition 2 --> committed offset 43 + * + * When configured to start from group offsets, each partition should read: + * partition 0 --> start from offset 23, read to offset 49 (27 records) + * partition 1 --> default to "auto.offset.reset" (set to earliest), so start from offset 0, read to offset 49 (50 records) + * partition 2 --> start from offset 43, read to offset 49 (7 records) + */ + public void runStartFromGroupOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromGroupOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); + + // the committed group offsets should be used as starting points + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); + + // only partitions 0 and 2 have group offsets committed + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Map<Integer, Tuple2<Integer, Integer>> partitionsToValueCountAndStartOffsets = new HashMap<>(); + partitionsToValueCountAndStartOffsets.put(0, new Tuple2<>(27, 23)); // partition 0 should read offset 23-49 + partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(50, 0)); // partition 1 should read offset 0-49 + partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(7, 43)); // partition 2 should read offset 43-49 + + readSequence(env, StartupMode.GROUP_OFFSETS, readProps, topicName, partitionsToValueCountAndStartOffsets); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } /** * Ensure Kafka is working on both producer and consumer side. @@ -1014,27 +1226,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } } - /** - * Serialization scheme forwarding byte[] records. - */ - private static class ByteArraySerializationSchema implements KeyedSerializationSchema<byte[]> { - - @Override - public byte[] serializeKey(byte[] element) { - return null; - } - - @Override - public byte[] serializeValue(byte[] element) { - return element; - } - - @Override - public String getTargetTopic(byte[] element) { - return null; - } - } - private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>, KeyedSerializationSchema<Tuple3<Integer, Integer, String>> { @@ -1588,7 +1779,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { * The method allows to individually specify the expected starting offset and total read value count of each partition. * The job will be considered successful only if all partition read results match the start offset and value count criteria. */ - protected void readSequence(StreamExecutionEnvironment env, Properties cc, + protected void readSequence(final StreamExecutionEnvironment env, + final StartupMode startupMode, + final Properties cc, final String topicName, final Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) throws Exception { final int sourceParallelism = partitionsToValuesCountAndStartOffset.keySet().size(); @@ -1607,6 +1800,17 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // create the consumer cc.putAll(secureProps); FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc); + switch (startupMode) { + case EARLIEST: + consumer.setStartFromEarliest(); + break; + case LATEST: + consumer.setStartFromLatest(); + break; + case GROUP_OFFSETS: + consumer.setStartFromGroupOffsets(); + break; + } DataStream<Tuple2<Integer, Integer>> source = env .addSource(consumer).setParallelism(sourceParallelism) @@ -1670,18 +1874,21 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } /** - * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, Properties, String, Map)} to + * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, Properties, String, Map)} to * expect reading from the same start offset and the same value count for all partitions of a single Kafka topic. */ - protected void readSequence(StreamExecutionEnvironment env, Properties cc, + protected void readSequence(final StreamExecutionEnvironment env, + final StartupMode startupMode, + final Properties cc, final int sourceParallelism, final String topicName, - final int valuesCount, final int startFrom) throws Exception { + final int valuesCount, + final int startFrom) throws Exception { HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<>(); for (int i = 0; i < sourceParallelism; i++) { partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom)); } - readSequence(env, cc, topicName, partitionsToValuesCountAndStartOffset); + readSequence(env, startupMode, cc, topicName, partitionsToValuesCountAndStartOffset); } protected String writeSequence( http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index dccf698..6a1f702 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -252,7 +252,6 @@ public class KafkaShortRetentionTestBase implements Serializable { try { env.execute("Test auto offset reset none"); } catch(Throwable e) { - System.out.println("MESSAGE: " + e.getCause().getCause().getMessage()); // check if correct exception has been thrown if(!e.getCause().getCause().getMessage().contains("Unable to find previous offset") // kafka 0.8 && !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9 http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 10c7b86..7f2a816 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -94,10 +94,11 @@ public abstract class KafkaTestEnvironment { public interface KafkaOffsetHandler { Long getCommittedOffset(String topicName, int partition); + void setCommittedOffset(String topicName, int partition, long offset); void close(); } - public abstract KafkaOffsetHandler createOffsetHandler(Properties props); + public abstract KafkaOffsetHandler createOffsetHandler(); // -- leader failure simulation http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java index 0b3507a..f2091f0 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java @@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -31,6 +32,7 @@ import org.junit.Test; import javax.annotation.Nullable; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,6 +56,7 @@ public class AbstractFetcherTimestampsTest { TestFetcher<Long> fetcher = new TestFetcher<>( sourceContext, originalPartitions, + null, null, /* periodic watermark assigner */ new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()), processingTimeProvider, @@ -128,6 +131,7 @@ public class AbstractFetcherTimestampsTest { TestFetcher<Long> fetcher = new TestFetcher<>( sourceContext, originalPartitions, + null, new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), null, /* punctuated watermarks assigner*/ processingTimeService, @@ -199,12 +203,23 @@ public class AbstractFetcherTimestampsTest { protected TestFetcher( SourceContext<T> sourceContext, List<KafkaTopicPartition> assignedPartitions, + HashMap<KafkaTopicPartition, Long> restoredSnapshotState, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval) throws Exception { - super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), false); + super( + sourceContext, + assignedPartitions, + restoredSnapshotState, + watermarksPeriodic, + watermarksPunctuated, + processingTimeProvider, + autoWatermarkInterval, + TestFetcher.class.getClassLoader(), + StartupMode.LATEST, + false); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java index acdad5a..131325f 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java @@ -34,7 +34,6 @@ public class JobManagerCommunicationUtils { private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS); - public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Exception { while (true) { // find the jobID @@ -53,6 +52,32 @@ public class JobManagerCommunicationUtils { } } + public static void waitUntilJobIsRunning(ActorGateway jobManager, String name) throws Exception { + while (true) { + Future<Object> listResponse = jobManager.ask( + JobManagerMessages.getRequestRunningJobsStatus(), + askTimeout); + + List<JobStatusMessage> jobs; + try { + Object result = Await.result(listResponse, askTimeout); + jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); + } + catch (Exception e) { + throw new Exception("Could not wait for job to start - failed to retrieve running jobs from the JobManager.", e); + } + + // see if the running jobs contain the requested job + for (JobStatusMessage job : jobs) { + if (job.getJobName().equals(name)) { + return; + } + } + + Thread.sleep(50); + } + } + public static void cancelCurrentJob(ActorGateway jobManager) throws Exception { cancelCurrentJob(jobManager, null); }
