Repository: flink
Updated Branches:
  refs/heads/master 6c310a762 -> 7477c5b57


http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index fa93138..38a3ce8 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -17,14 +17,11 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
@@ -32,21 +29,14 @@ import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
-import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyMapOf;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 
 /**
@@ -58,55 +48,13 @@ import static org.mockito.Mockito.mock;
  */
 public class FlinkKafkaConsumerBaseMigrationTest {
 
-       private static String getResourceFilename(String filename) {
-               ClassLoader cl = 
FlinkKafkaConsumerBaseMigrationTest.class.getClassLoader();
-               URL resource = cl.getResource(filename);
-               if (resource == null) {
-                       throw new NullPointerException("Missing snapshot 
resource.");
-               }
-               return resource.getFile();
-       }
-
+       /** Test restoring from an legacy empty state, when no partitions could 
be found for topics. */
        @Test
        public void testRestoreFromFlink11WithEmptyStateNoPartitions() throws 
Exception {
-               // 
--------------------------------------------------------------------
-               //   prepare fake states
-               // 
--------------------------------------------------------------------
+               final DummyFlinkKafkaConsumer<String> consumerFunction =
+                       new 
DummyFlinkKafkaConsumer<>(Collections.<KafkaTopicPartition>emptyList());
 
-               final OneShotLatch latch = new OneShotLatch();
-               final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
-
-               doAnswer(new Answer() {
-                       @Override
-                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               latch.trigger();
-                               Assert.fail("This should never be called");
-                               return null;
-                       }
-               
}).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, 
Long.class));
-
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                               latch.trigger();
-                               Assert.fail("This should never be called");
-                               return null;
-                       }
-               }).when(fetcher).runFetchLoop();
-
-               final DummyFlinkKafkaConsumer<String> consumerFunction = new 
DummyFlinkKafkaConsumer<>(
-                               new FetcherFactory<String>() {
-                                       private static final long 
serialVersionUID = -2803131905656983619L;
-
-                                       @Override
-                                       public AbstractFetcher<String, ?> 
createFetcher() {
-                                               return fetcher;
-                                       }
-                               },
-                               Collections.<KafkaTopicPartition>emptyList());
-
-               StreamSource<String, DummyFlinkKafkaConsumer<String>> 
consumerOperator =
-                       new StreamSource<>(consumerFunction);
+               StreamSource<String, DummyFlinkKafkaConsumer<String>> 
consumerOperator = new StreamSource<>(consumerFunction);
 
                final AbstractStreamOperatorTestHarness<String> testHarness =
                        new 
AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
@@ -114,88 +62,30 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                
testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
                testHarness.setup();
+               // restore state from binary snapshot file using legacy method
                testHarness.initializeStateFromLegacyCheckpoint(
                        
getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
                testHarness.open();
 
-               final Throwable[] error = new Throwable[1];
-
-               // run the source asynchronously
-               Thread runner = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       consumerFunction.run(new 
DummySourceContext() {
-                                               @Override
-                                               public void collect(String 
element) {
-                                                       latch.trigger();
-                                                       Assert.fail("This 
should never be called.");
-                                               }
-
-                                               @Override
-                                               public void 
emitWatermark(Watermark mark) {
-                                                       latch.trigger();
-                                                       
assertEquals(Long.MAX_VALUE, mark.getTimestamp());
-                                               }
-                                       });
-                               }
-                               catch (Throwable t) {
-                                       t.printStackTrace();
-                                       error[0] = t;
-                               }
-                       }
-               };
-               runner.start();
+               // assert that no partitions were found and is empty
+               Assert.assertTrue(consumerFunction.getSubscribedPartitions() != 
null);
+               
Assert.assertTrue(consumerFunction.getSubscribedPartitions().isEmpty());
 
-               if (!latch.isTriggered()) {
-                       latch.await();
-               }
+               // assert that no state was restored
+               Assert.assertTrue(consumerFunction.getRestoredState() == null);
 
                consumerOperator.close();
-
                consumerOperator.cancel();
-               runner.interrupt();
-               runner.join();
-
-               Assert.assertNull(error[0]);
        }
 
+       /** Test restoring from an empty state taken using Flink 1.1, when some 
partitions could be found for topics. */
        @Test
        public void testRestoreFromFlink11WithEmptyStateWithPartitions() throws 
Exception {
-               final OneShotLatch latch = new OneShotLatch();
-               final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
-
-               doAnswer(new Answer() {
-                       @Override
-                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               latch.trigger();
-                               Assert.fail("This should never be called");
-                               return null;
-                       }
-               
}).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, 
Long.class));
-
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                               latch.trigger();
-                               return null;
-                       }
-               }).when(fetcher).runFetchLoop();
-
                final List<KafkaTopicPartition> partitions = new ArrayList<>();
                partitions.add(new KafkaTopicPartition("abc", 13));
                partitions.add(new KafkaTopicPartition("def", 7));
 
-               final DummyFlinkKafkaConsumer<String> consumerFunction = new 
DummyFlinkKafkaConsumer<>(
-                               new FetcherFactory<String>() {
-                                       private static final long 
serialVersionUID = -2803131905656983619L;
-
-                                       @Override
-                                       public AbstractFetcher<String, ?> 
createFetcher() {
-                                               return fetcher;
-                                       }
-                               },
-                               partitions);
+               final DummyFlinkKafkaConsumer<String> consumerFunction = new 
DummyFlinkKafkaConsumer<>(partitions);
 
                StreamSource<String, DummyFlinkKafkaConsumer<String>> 
consumerOperator =
                        new StreamSource<>(consumerFunction);
@@ -206,89 +96,31 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                
testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
                testHarness.setup();
+               // restore state from binary snapshot file using legacy method
                testHarness.initializeStateFromLegacyCheckpoint(
                        
getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
                testHarness.open();
 
-               final Throwable[] error = new Throwable[1];
-
-               // run the source asynchronously
-               Thread runner = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       consumerFunction.run(new 
DummySourceContext() {
-                                               @Override
-                                               public void collect(String 
element) {
-                                                       latch.trigger();
-                                                       Assert.fail("This 
should never be called.");
-                                               }
-
-                                               @Override
-                                               public void 
emitWatermark(Watermark mark) {
-                                                       latch.trigger();
-                                                       Assert.fail("This 
should never be called.");
-                                               }
-                                       });
-                               }
-                               catch (Throwable t) {
-                                       t.printStackTrace();
-                                       error[0] = t;
-                               }
-                       }
-               };
-               runner.start();
+               // assert that there are partitions and is identical to 
expected list
+               Assert.assertTrue(consumerFunction.getSubscribedPartitions() != 
null);
+               
Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
+               
Assert.assertTrue(consumerFunction.getSubscribedPartitions().equals(partitions));
 
-               if (!latch.isTriggered()) {
-                       latch.await();
-               }
+               // assert that no state was restored
+               Assert.assertTrue(consumerFunction.getRestoredState() == null);
 
                consumerOperator.close();
-
-               runner.join();
-
-               Assert.assertNull(error[0]);
+               consumerOperator.cancel();
        }
 
+       /** Test restoring from a non-empty state taken using Flink 1.1, when 
some partitions could be found for topics. */
        @Test
        public void testRestoreFromFlink11() throws Exception {
-               // 
--------------------------------------------------------------------
-               //   prepare fake states
-               // 
--------------------------------------------------------------------
-
-               final HashMap<KafkaTopicPartition, Long> state1 = new 
HashMap<>();
-               state1.put(new KafkaTopicPartition("abc", 13), 16768L);
-               state1.put(new KafkaTopicPartition("def", 7), 987654321L);
-
-               final OneShotLatch latch = new OneShotLatch();
-               final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
-
-               doAnswer(new Answer() {
-                       @Override
-                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               Map<KafkaTopicPartition, Long> map = 
(HashMap<KafkaTopicPartition, Long>) invocationOnMock.getArguments()[0];
-
-                               latch.trigger();
-                               assertEquals(state1, map);
-                               return null;
-                       }
-               
}).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, 
Long.class));
-
-
                final List<KafkaTopicPartition> partitions = new ArrayList<>();
                partitions.add(new KafkaTopicPartition("abc", 13));
                partitions.add(new KafkaTopicPartition("def", 7));
 
-               final DummyFlinkKafkaConsumer<String> consumerFunction = new 
DummyFlinkKafkaConsumer<>(
-                               new FetcherFactory<String>() {
-                                       private static final long 
serialVersionUID = -2803131905656983619L;
-
-                                       @Override
-                                       public AbstractFetcher<String, ?> 
createFetcher() {
-                                               return fetcher;
-                                       }
-                               },
-                               partitions);
+               final DummyFlinkKafkaConsumer<String> consumerFunction = new 
DummyFlinkKafkaConsumer<>(partitions);
 
                StreamSource<String, DummyFlinkKafkaConsumer<String>> 
consumerOperator =
                        new StreamSource<>(consumerFunction);
@@ -299,91 +131,60 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                
testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
                testHarness.setup();
+               // restore state from binary snapshot file using legacy method
                testHarness.initializeStateFromLegacyCheckpoint(
                        
getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot"));
                testHarness.open();
 
-               final Throwable[] error = new Throwable[1];
+               // assert that there are partitions and is identical to 
expected list
+               Assert.assertTrue(consumerFunction.getSubscribedPartitions() != 
null);
+               
Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
+               Assert.assertEquals(partitions, 
consumerFunction.getSubscribedPartitions());
 
-               // run the source asynchronously
-               Thread runner = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       consumerFunction.run(new 
DummySourceContext() {
-                                               @Override
-                                               public void collect(String 
element) {
-                                                       //latch.trigger();
-                                               }
-                                       });
-                               }
-                               catch (Throwable t) {
-                                       t.printStackTrace();
-                                       error[0] = t;
-                               }
-                       }
-               };
-               runner.start();
+               // the expected state in 
"kafka-consumer-migration-test-flink1.1-snapshot"
+               final HashMap<KafkaTopicPartition, Long> expectedState = new 
HashMap<>();
+               expectedState.put(new KafkaTopicPartition("abc", 13), 16768L);
+               expectedState.put(new KafkaTopicPartition("def", 7), 
987654321L);
 
-               if (!latch.isTriggered()) {
-                       latch.await();
-               }
+               // assert that state is correctly restored from legacy 
checkpoint
+               Assert.assertTrue(consumerFunction.getRestoredState() != null);
+               Assert.assertEquals(expectedState, 
consumerFunction.getRestoredState());
 
                consumerOperator.close();
-
-               runner.join();
-
-               Assert.assertNull(error[0]);
-       }
-
-       private abstract static class DummySourceContext
-               implements SourceFunction.SourceContext<String> {
-
-               private final Object lock = new Object();
-
-               @Override
-               public void collectWithTimestamp(String element, long 
timestamp) {
-               }
-
-               @Override
-               public void emitWatermark(Watermark mark) {
-               }
-
-               @Override
-               public Object getCheckpointLock() {
-                       return lock;
-               }
-
-               @Override
-               public void close() {
-               }
+               consumerOperator.cancel();
        }
 
        // 
------------------------------------------------------------------------
 
-       private interface FetcherFactory<T> extends Serializable {
-               AbstractFetcher<T, ?> createFetcher();
+       private static String getResourceFilename(String filename) {
+               ClassLoader cl = 
FlinkKafkaConsumerBaseMigrationTest.class.getClassLoader();
+               URL resource = cl.getResource(filename);
+               if (resource == null) {
+                       throw new NullPointerException("Missing snapshot 
resource.");
+               }
+               return resource.getFile();
        }
 
        private static class DummyFlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
                private static final long serialVersionUID = 1L;
 
-               private final FetcherFactory<T> fetcherFactory;
-
                private final List<KafkaTopicPartition> partitions;
 
                @SuppressWarnings("unchecked")
-               DummyFlinkKafkaConsumer(
-                               FetcherFactory<T> fetcherFactory,
-                               List<KafkaTopicPartition> partitions) {
+               DummyFlinkKafkaConsumer(List<KafkaTopicPartition> partitions) {
                        super(Arrays.asList("dummy-topic"), 
(KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class));
-                       this.fetcherFactory = fetcherFactory;
                        this.partitions = partitions;
                }
 
                @Override
-               protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> 
sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, 
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, 
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, 
StreamingRuntimeContext runtimeContext) throws Exception {
-                       return fetcherFactory.createFetcher();
+               protected AbstractFetcher<T, ?> createFetcher(
+                               SourceContext<T> sourceContext,
+                               List<KafkaTopicPartition> thisSubtaskPartitions,
+                               HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
+                               
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+                               
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+                               StreamingRuntimeContext runtimeContext) throws 
Exception {
+                       return mock(AbstractFetcher.class);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index b96ba30..980a025 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -361,16 +361,19 @@ public class FlinkKafkaConsumerBaseTest {
                }
 
                @Override
-               protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> 
sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, 
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, 
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, 
StreamingRuntimeContext runtimeContext) throws Exception {
-                       AbstractFetcher<T, ?> fetcher = 
mock(AbstractFetcher.class);
-                       doAnswer(new Answer() {
-                               @Override
-                               public Object answer(InvocationOnMock 
invocationOnMock) throws Throwable {
-                                       Assert.fail("Trying to restore offsets 
even though there was no restore state.");
-                                       return null;
-                               }
-                       }).when(fetcher).restoreOffsets(any(HashMap.class));
-                       return fetcher;
+               @SuppressWarnings("unchecked")
+               protected AbstractFetcher<T, ?> createFetcher(
+                               SourceContext<T> sourceContext,
+                               List<KafkaTopicPartition> thisSubtaskPartitions,
+                               HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
+                               
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+                               
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+                               StreamingRuntimeContext runtimeContext) throws 
Exception {
+                       if (restoredSnapshotState != null) {
+                               Assert.fail("Trying to restore offsets even 
though there was no restore state.");
+                               return null;
+                       }
+                       return mock(AbstractFetcher.class);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index d7fab88..cb8b0d0 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -61,6 +61,7 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
@@ -234,7 +235,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                final Long l50 = 50L; // the final committed offset in Kafka 
should be 50
                final long deadline = 30000 + System.currentTimeMillis();
 
-               KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+               KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler();
 
                do {
                        Long o1 = 
kafkaOffsetHandler.getCommittedOffset(topicName, 0);
@@ -281,7 +282,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                final String topicName = 
writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
 
-               KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+               KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler();
 
                Long o1;
                Long o2;
@@ -348,7 +349,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        (o3 != null) ? o3.intValue() : 0
                ));
 
-               readSequence(env2, standardProps, topicName, 
partitionsToValuesCountAndStartOffset);
+               readSequence(env2, StartupMode.GROUP_OFFSETS, standardProps, 
topicName, partitionsToValuesCountAndStartOffset);
 
                kafkaOffsetHandler.close();
                deleteTestTopic(topicName);
@@ -402,7 +403,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                };
                runner.start();
 
-               KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+               KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler();
 
                final Long l50 = 50L; // the final committed offset in Kafka 
should be 50
                final long deadline = 30000 + System.currentTimeMillis();
@@ -438,6 +439,217 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                kafkaOffsetHandler.close();
                deleteTestTopic(topicName);
        }
+
+       /**
+        * This test ensures that when explicitly set to start from earliest 
record, the consumer
+        * ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+        */
+       public void runStartFromEarliestOffsets() throws Exception {
+               // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+               final int parallelism = 3;
+               final int recordsInEachPartition = 50;
+
+               final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env.getConfig().disableSysoutLogging();
+               env.setParallelism(parallelism);
+
+               Properties readProps = new Properties();
+               readProps.putAll(standardProps);
+               readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+               // the committed offsets should be ignored
+               KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler();
+               kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+               kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+               kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+               readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+               kafkaOffsetHandler.close();
+               deleteTestTopic(topicName);
+       }
+
+       /**
+        * This test ensures that when explicitly set to start from latest 
record, the consumer
+        * ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+        */
+       public void runStartFromLatestOffsets() throws Exception {
+               // 50 records written to each of 3 partitions before launching 
a latest-starting consuming job
+               final int parallelism = 3;
+               final int recordsInEachPartition = 50;
+
+               // each partition will be written an extra 200 records
+               final int extraRecordsInEachPartition = 200;
+
+               // all already existing data in the topic, before the consuming 
topology has started, should be ignored
+               final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+               // the committed offsets should be ignored
+               KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler();
+               kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+               kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+               kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+               // job names for the topologies for writing and consuming the 
extra records
+               final String consumeExtraRecordsJobName = "Consume Extra 
Records Job";
+               final String writeExtraRecordsJobName = "Write Extra Records 
Job";
+
+               // seriliazation / deserialization schemas for writing and 
consuming the extra records
+               final TypeInformation<Tuple2<Integer, Integer>> resultType =
+                       TypeInformation.of(new TypeHint<Tuple2<Integer, 
Integer>>() {});
+
+               final KeyedSerializationSchema<Tuple2<Integer, Integer>> 
serSchema =
+                       new KeyedSerializationSchemaWrapper<>(
+                               new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+               final KeyedDeserializationSchema<Tuple2<Integer, Integer>> 
deserSchema =
+                       new KeyedDeserializationSchemaWrapper<>(
+                               new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+               // setup and run the latest-consuming job
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env.getConfig().disableSysoutLogging();
+               env.setParallelism(parallelism);
+
+               final Properties readProps = new Properties();
+               readProps.putAll(standardProps);
+               readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
+
+               FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> 
latestReadingConsumer =
+                       kafkaServer.getConsumer(topicName, deserSchema, 
readProps);
+               latestReadingConsumer.setStartFromLatest();
+
+               env
+                       
.addSource(latestReadingConsumer).setParallelism(parallelism)
+                       .flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, 
Object>() {
+                               @Override
+                               public void flatMap(Tuple2<Integer, Integer> 
value, Collector<Object> out) throws Exception {
+                                       if (value.f1 - recordsInEachPartition < 
0) {
+                                               throw new 
RuntimeException("test failed; consumed a record that was previously written: " 
+ value);
+                                       }
+                               }
+                       }).setParallelism(1)
+                       .addSink(new DiscardingSink<>());
+
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+               Thread consumeThread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       env.execute(consumeExtraRecordsJobName);
+                               } catch (Throwable t) {
+                                       if (!(t.getCause() instanceof 
JobCancellationException)) {
+                                               error.set(t);
+                                       }
+                               }
+                       }
+               });
+               consumeThread.start();
+
+               // wait until the consuming job has started, to be extra safe
+               JobManagerCommunicationUtils.waitUntilJobIsRunning(
+                       flink.getLeaderGateway(timeout),
+                       consumeExtraRecordsJobName);
+
+               // setup the extra records writing job
+               final StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+
+               DataStream<Tuple2<Integer, Integer>> extraRecordsStream = env2
+                       .addSource(new 
RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+
+                               private boolean running = true;
+
+                               @Override
+                               public void run(SourceContext<Tuple2<Integer, 
Integer>> ctx) throws Exception {
+                                       int count = recordsInEachPartition; // 
the extra records should start from the last written value
+                                       int partition = 
getRuntimeContext().getIndexOfThisSubtask();
+
+                                       while (running && count < 
recordsInEachPartition + extraRecordsInEachPartition) {
+                                               ctx.collect(new 
Tuple2<>(partition, count));
+                                               count++;
+                                       }
+                               }
+
+                               @Override
+                               public void cancel() {
+                                       running = false;
+                               }
+                       }).setParallelism(parallelism);
+
+               kafkaServer.produceIntoKafka(extraRecordsStream, topicName, 
serSchema, readProps, null);
+
+               try {
+                       env2.execute(writeExtraRecordsJobName);
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Writing extra records 
failed", e);
+               }
+
+               // cancel the consume job after all extra records are written
+               JobManagerCommunicationUtils.cancelCurrentJob(
+                       flink.getLeaderGateway(timeout),
+                       consumeExtraRecordsJobName);
+               consumeThread.join();
+
+               kafkaOffsetHandler.close();
+               deleteTestTopic(topicName);
+
+               // check whether the consuming thread threw any test errors;
+               // test will fail here if the consume job had incorrectly read 
any records other than the extra records
+               final Throwable consumerError = error.get();
+               if (consumerError != null) {
+                       throw new Exception("Exception in the consuming 
thread", consumerError);
+               }
+       }
+
+       /**
+        * This test ensures that the consumer correctly uses group offsets in 
Kafka, and defaults to "auto.offset.reset"
+        * behaviour when necessary, when explicitly configured to start from 
group offsets.
+        *
+        * The partitions and their committed group offsets are setup as:
+        *      partition 0 --> committed offset 23
+        *      partition 1 --> no commit offset
+        *      partition 2 --> committed offset 43
+        *
+        * When configured to start from group offsets, each partition should 
read:
+        *      partition 0 --> start from offset 23, read to offset 49 (27 
records)
+        *      partition 1 --> default to "auto.offset.reset" (set to 
earliest), so start from offset 0, read to offset 49 (50 records)
+        *      partition 2 --> start from offset 43, read to offset 49 (7 
records)
+        */
+       public void runStartFromGroupOffsets() throws Exception {
+               // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+               final int parallelism = 3;
+               final int recordsInEachPartition = 50;
+
+               final String topicName = 
writeSequence("testStartFromGroupOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env.getConfig().disableSysoutLogging();
+               env.setParallelism(parallelism);
+
+               Properties readProps = new Properties();
+               readProps.putAll(standardProps);
+               readProps.setProperty("auto.offset.reset", "earliest");
+
+               // the committed group offsets should be used as starting points
+               KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler();
+
+               // only partitions 0 and 2 have group offsets committed
+               kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+               kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+               Map<Integer, Tuple2<Integer, Integer>> 
partitionsToValueCountAndStartOffsets = new HashMap<>();
+               partitionsToValueCountAndStartOffsets.put(0, new Tuple2<>(27, 
23)); // partition 0 should read offset 23-49
+               partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(50, 
0)); // partition 1 should read offset 0-49
+               partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(7, 
43));      // partition 2 should read offset 43-49
+
+               readSequence(env, StartupMode.GROUP_OFFSETS, readProps, 
topicName, partitionsToValueCountAndStartOffsets);
+
+               kafkaOffsetHandler.close();
+               deleteTestTopic(topicName);
+       }
        
        /**
         * Ensure Kafka is working on both producer and consumer side.
@@ -1014,27 +1226,6 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                }
        }
 
-       /**
-        * Serialization scheme forwarding byte[] records.
-        */
-       private static class ByteArraySerializationSchema implements 
KeyedSerializationSchema<byte[]> {
-
-               @Override
-               public byte[] serializeKey(byte[] element) {
-                       return null;
-               }
-
-               @Override
-               public byte[] serializeValue(byte[] element) {
-                       return element;
-               }
-
-               @Override
-               public String getTargetTopic(byte[] element) {
-                       return null;
-               }
-       }
-
        private static class Tuple2WithTopicSchema implements 
KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
                        KeyedSerializationSchema<Tuple3<Integer, Integer, 
String>> {
 
@@ -1588,7 +1779,9 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
         * The method allows to individually specify the expected starting 
offset and total read value count of each partition.
         * The job will be considered successful only if all partition read 
results match the start offset and value count criteria.
         */
-       protected void readSequence(StreamExecutionEnvironment env, Properties 
cc,
+       protected void readSequence(final StreamExecutionEnvironment env,
+                                                               final 
StartupMode startupMode,
+                                                               final 
Properties cc,
                                                                final String 
topicName,
                                                                final 
Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) 
throws Exception {
                final int sourceParallelism = 
partitionsToValuesCountAndStartOffset.keySet().size();
@@ -1607,6 +1800,17 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                // create the consumer
                cc.putAll(secureProps);
                FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = 
kafkaServer.getConsumer(topicName, deser, cc);
+               switch (startupMode) {
+                       case EARLIEST:
+                               consumer.setStartFromEarliest();
+                               break;
+                       case LATEST:
+                               consumer.setStartFromLatest();
+                               break;
+                       case GROUP_OFFSETS:
+                               consumer.setStartFromGroupOffsets();
+                               break;
+               }
 
                DataStream<Tuple2<Integer, Integer>> source = env
                        .addSource(consumer).setParallelism(sourceParallelism)
@@ -1670,18 +1874,21 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
        }
 
        /**
-        * Variant of {@link 
KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, Properties, 
String, Map)} to
+        * Variant of {@link 
KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, 
Properties, String, Map)} to
         * expect reading from the same start offset and the same value count 
for all partitions of a single Kafka topic.
         */
-       protected void readSequence(StreamExecutionEnvironment env, Properties 
cc,
+       protected void readSequence(final StreamExecutionEnvironment env,
+                                                               final 
StartupMode startupMode,
+                                                               final 
Properties cc,
                                                                final int 
sourceParallelism,
                                                                final String 
topicName,
-                                                               final int 
valuesCount, final int startFrom) throws Exception {
+                                                               final int 
valuesCount,
+                                                               final int 
startFrom) throws Exception {
                HashMap<Integer, Tuple2<Integer, Integer>> 
partitionsToValuesCountAndStartOffset = new HashMap<>();
                for (int i = 0; i < sourceParallelism; i++) {
                        partitionsToValuesCountAndStartOffset.put(i, new 
Tuple2<>(valuesCount, startFrom));
                }
-               readSequence(env, cc, topicName, 
partitionsToValuesCountAndStartOffset);
+               readSequence(env, startupMode, cc, topicName, 
partitionsToValuesCountAndStartOffset);
        }
 
        protected String writeSequence(

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index dccf698..6a1f702 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -252,7 +252,6 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
                try {
                        env.execute("Test auto offset reset none");
                } catch(Throwable e) {
-                       System.out.println("MESSAGE: " + 
e.getCause().getCause().getMessage());
                        // check if correct exception has been thrown
                        
if(!e.getCause().getCause().getMessage().contains("Unable to find previous 
offset")  // kafka 0.8
                         && 
!e.getCause().getCause().getMessage().contains("Undefined offset with no reset 
policy for partition") // kafka 0.9

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 10c7b86..7f2a816 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -94,10 +94,11 @@ public abstract class KafkaTestEnvironment {
 
        public interface KafkaOffsetHandler {
                Long getCommittedOffset(String topicName, int partition);
+               void setCommittedOffset(String topicName, int partition, long 
offset);
                void close();
        }
 
-       public abstract KafkaOffsetHandler createOffsetHandler(Properties 
props);
+       public abstract KafkaOffsetHandler createOffsetHandler();
 
        // -- leader failure simulation
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 0b3507a..f2091f0 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -31,6 +32,7 @@ import org.junit.Test;
 
 import javax.annotation.Nullable;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -54,6 +56,7 @@ public class AbstractFetcherTimestampsTest {
                TestFetcher<Long> fetcher = new TestFetcher<>(
                                sourceContext,
                                originalPartitions,
+                               null,
                                null, /* periodic watermark assigner */
                                new 
SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new 
PunctuatedTestExtractor()),
                                processingTimeProvider,
@@ -128,6 +131,7 @@ public class AbstractFetcherTimestampsTest {
                TestFetcher<Long> fetcher = new TestFetcher<>(
                                sourceContext,
                                originalPartitions,
+                               null,
                                new 
SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new 
PeriodicTestExtractor()),
                                null, /* punctuated watermarks assigner*/
                                processingTimeService,
@@ -199,12 +203,23 @@ public class AbstractFetcherTimestampsTest {
                protected TestFetcher(
                                SourceContext<T> sourceContext,
                                List<KafkaTopicPartition> assignedPartitions,
+                               HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
                                
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
                                
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
                                ProcessingTimeService processingTimeProvider,
                                long autoWatermarkInterval) throws Exception
                {
-                       super(sourceContext, assignedPartitions, 
watermarksPeriodic, watermarksPunctuated, processingTimeProvider, 
autoWatermarkInterval, TestFetcher.class.getClassLoader(), false);
+                       super(
+                               sourceContext,
+                               assignedPartitions,
+                               restoredSnapshotState,
+                               watermarksPeriodic,
+                               watermarksPunctuated,
+                               processingTimeProvider,
+                               autoWatermarkInterval,
+                               TestFetcher.class.getClassLoader(),
+                               StartupMode.LATEST,
+                               false);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7477c5b5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
index acdad5a..131325f 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -34,7 +34,6 @@ public class JobManagerCommunicationUtils {
 
        private static final FiniteDuration askTimeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
 
-
        public static void waitUntilNoJobIsRunning(ActorGateway jobManager) 
throws Exception {
                while (true) {
                        // find the jobID
@@ -53,6 +52,32 @@ public class JobManagerCommunicationUtils {
                }
        }
 
+       public static void waitUntilJobIsRunning(ActorGateway jobManager, 
String name) throws Exception {
+               while (true) {
+                       Future<Object> listResponse = jobManager.ask(
+                               
JobManagerMessages.getRequestRunningJobsStatus(),
+                               askTimeout);
+
+                       List<JobStatusMessage> jobs;
+                       try {
+                               Object result = Await.result(listResponse, 
askTimeout);
+                               jobs = ((JobManagerMessages.RunningJobsStatus) 
result).getStatusMessages();
+                       }
+                       catch (Exception e) {
+                               throw new Exception("Could not wait for job to 
start - failed to retrieve running jobs from the JobManager.", e);
+                       }
+
+                       // see if the running jobs contain the requested job
+                       for (JobStatusMessage job : jobs) {
+                               if (job.getJobName().equals(name)) {
+                                       return;
+                               }
+                       }
+
+                       Thread.sleep(50);
+               }
+       }
+
        public static void cancelCurrentJob(ActorGateway jobManager) throws 
Exception {
                cancelCurrentJob(jobManager, null);
        }

Reply via email to