[FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset determination to 
open()

This closes #3378.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed68fedb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed68fedb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed68fedb

Branch: refs/heads/master
Commit: ed68fedbe90db03823d75a020510ad3c344fa73e
Parents: 72f56d1
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Tue Feb 21 23:05:32 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Feb 28 00:54:48 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer010.java |   9 +-
 .../kafka/internal/Kafka010Fetcher.java         |  12 +-
 .../internal/KafkaConsumerCallBridge010.java    |   9 +-
 .../connectors/kafka/Kafka010FetcherTest.java   |  23 +-
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  22 +-
 .../kafka/internals/Kafka08Fetcher.java         |  77 +++----
 .../kafka/internals/ZookeeperOffsetHandler.java |  18 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  |   9 +-
 .../kafka/internal/Kafka09Fetcher.java          |  17 +-
 .../kafka/internal/KafkaConsumerCallBridge.java |  12 +-
 .../kafka/internal/KafkaConsumerThread.java     |  79 ++-----
 .../connectors/kafka/Kafka09FetcherTest.java    |  23 +-
 .../kafka/FlinkKafkaConsumerBase.java           | 187 ++++++++--------
 .../connectors/kafka/config/StartupMode.java    |  20 +-
 .../kafka/internals/AbstractFetcher.java        |  89 ++++----
 .../internals/KafkaTopicPartitionState.java     |  10 +-
 .../KafkaTopicPartitionStateSentinel.java       |  55 +++++
 .../FlinkKafkaConsumerBaseMigrationTest.java    |  33 ++-
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  20 +-
 .../KafkaConsumerPartitionAssignmentTest.java   | 222 ++++++++++++-------
 .../AbstractFetcherTimestampsTest.java          |  37 ++--
 21 files changed, 510 insertions(+), 473 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 3a58216..716fa19 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaW
 import org.apache.flink.util.SerializedValue;
 
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.Map;
 import java.util.List;
 import java.util.Properties;
 
@@ -128,8 +128,7 @@ public class FlinkKafkaConsumer010<T> extends 
FlinkKafkaConsumer09<T> {
        @Override
        protected AbstractFetcher<T, ?> createFetcher(
                        SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> thisSubtaskPartitions,
-                       HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
+                       Map<KafkaTopicPartition, Long> 
assignedPartitionsWithInitialOffsets,
                        SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
                        SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
                        StreamingRuntimeContext runtimeContext) throws 
Exception {
@@ -138,8 +137,7 @@ public class FlinkKafkaConsumer010<T> extends 
FlinkKafkaConsumer09<T> {
 
                return new Kafka010Fetcher<>(
                                sourceContext,
-                               thisSubtaskPartitions,
-                               restoredSnapshotState,
+                               assignedPartitionsWithInitialOffsets,
                                watermarksPeriodic,
                                watermarksPunctuated,
                                runtimeContext.getProcessingTimeService(),
@@ -151,7 +149,6 @@ public class FlinkKafkaConsumer010<T> extends 
FlinkKafkaConsumer09<T> {
                                deserializer,
                                properties,
                                pollTimeout,
-                               startupMode,
                                useMetrics);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index efb6f88..da6ecd0 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -22,7 +22,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -32,8 +31,7 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
-import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -48,8 +46,7 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 
        public Kafka010Fetcher(
                        SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> assignedPartitions,
-                       HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
+                       Map<KafkaTopicPartition, Long> 
assignedPartitionsWithInitialOffsets,
                        SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
                        SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
                        ProcessingTimeService processingTimeProvider,
@@ -61,13 +58,11 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
                        KeyedDeserializationSchema<T> deserializer,
                        Properties kafkaProperties,
                        long pollTimeout,
-                       StartupMode startupMode,
                        boolean useMetrics) throws Exception
        {
                super(
                                sourceContext,
-                               assignedPartitions,
-                               restoredSnapshotState,
+                               assignedPartitionsWithInitialOffsets,
                                watermarksPeriodic,
                                watermarksPunctuated,
                                processingTimeProvider,
@@ -79,7 +74,6 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
                                deserializer,
                                kafkaProperties,
                                pollTimeout,
-                               startupMode,
                                useMetrics);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
index 1e0bc5b..0fda9a6 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.internal;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -39,12 +40,12 @@ public class KafkaConsumerCallBridge010 extends 
KafkaConsumerCallBridge {
        }
 
        @Override
-       public void seekPartitionsToBeginning(KafkaConsumer<?, ?> consumer, 
List<TopicPartition> partitions) {
-               consumer.seekToBeginning(partitions);
+       public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, 
TopicPartition partition) {
+               consumer.seekToBeginning(Collections.singletonList(partition));
        }
 
        @Override
-       public void seekPartitionsToEnd(KafkaConsumer<?, ?> consumer, 
List<TopicPartition> partitions) {
-               consumer.seekToEnd(partitions);
+       public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, 
TopicPartition partition) {
+               consumer.seekToEnd(Collections.singletonList(partition));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 98aa28a..17ba712 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -24,10 +24,10 @@ import 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.internal.Handover;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -118,13 +118,13 @@ public class Kafka010FetcherTest {
 
                @SuppressWarnings("unchecked")
                SourceContext<String> sourceContext = mock(SourceContext.class);
-               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
+               Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+                       Collections.singletonMap(new 
KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
                final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
                                sourceContext,
-                               topics,
-                               null, /* no restored state */
+                               partitionsWithInitialOffsets,
                                null, /* periodic assigner */
                                null, /* punctuated assigner */
                                new TestProcessingTimeService(),
@@ -136,7 +136,6 @@ public class Kafka010FetcherTest {
                                schema,
                                new Properties(),
                                0L,
-                               StartupMode.GROUP_OFFSETS,
                                false);
 
                // ----- run the fetcher -----
@@ -256,13 +255,13 @@ public class Kafka010FetcherTest {
 
                @SuppressWarnings("unchecked")
                SourceContext<String> sourceContext = mock(SourceContext.class);
-               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
+               Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+                       Collections.singletonMap(new 
KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
                final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
                                sourceContext,
-                               topics,
-                               null, /* no restored state */
+                               partitionsWithInitialOffsets,
                                null, /* periodic assigner */
                                null, /* punctuated assigner */
                                new TestProcessingTimeService(),
@@ -274,7 +273,6 @@ public class Kafka010FetcherTest {
                                schema,
                                new Properties(),
                                0L,
-                               StartupMode.GROUP_OFFSETS,
                                false);
 
                // ----- run the fetcher -----
@@ -372,13 +370,13 @@ public class Kafka010FetcherTest {
                // ----- build a fetcher -----
 
                BlockingSourceContext<String> sourceContext = new 
BlockingSourceContext<>();
-               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition(topic, partition));
+               Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+                       Collections.singletonMap(new KafkaTopicPartition(topic, 
partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
                final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
                                sourceContext,
-                               topics,
-                               null, /* no restored state */
+                               partitionsWithInitialOffsets,
                                null, /* periodic watermark extractor */
                                null, /* punctuated watermark extractor */
                                new TestProcessingTimeService(),
@@ -390,7 +388,6 @@ public class Kafka010FetcherTest {
                                schema,
                                new Properties(),
                                0L,
-                               StartupMode.GROUP_OFFSETS,
                                false);
 
                // ----- run the fetcher -----

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index c0e4dd7..bf7ed02 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -45,10 +45,10 @@ import java.net.InetAddress;
 import java.net.URL;
 import java.net.UnknownHostException;
 import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
 import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
@@ -194,19 +194,23 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
        @Override
        protected AbstractFetcher<T, ?> createFetcher(
                        SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> thisSubtaskPartitions,
-                       HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
+                       Map<KafkaTopicPartition, Long> 
assignedPartitionsWithInitialOffsets,
                        SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
                        SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
                        StreamingRuntimeContext runtimeContext) throws 
Exception {
 
                boolean useMetrics = 
!Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false"));
 
-               return new Kafka08Fetcher<>(sourceContext,
-                               thisSubtaskPartitions, restoredSnapshotState,
-                               watermarksPeriodic, watermarksPunctuated,
-                               runtimeContext, deserializer, kafkaProperties,
-                               autoCommitInterval, startupMode, useMetrics);
+               return new Kafka08Fetcher<>(
+                               sourceContext,
+                               assignedPartitionsWithInitialOffsets,
+                               watermarksPeriodic,
+                               watermarksPunctuated,
+                               runtimeContext,
+                               deserializer,
+                               kafkaProperties,
+                               autoCommitInterval,
+                               useMetrics);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index ad520d8..de201e5 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -91,27 +91,23 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, 
TopicAndPartition> {
 
        public Kafka08Fetcher(
                        SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> assignedPartitions,
-                       HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
+                       Map<KafkaTopicPartition, Long> 
assignedPartitionsWithInitialOffsets,
                        SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
                        SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
                        StreamingRuntimeContext runtimeContext,
                        KeyedDeserializationSchema<T> deserializer,
                        Properties kafkaProperties,
                        long autoCommitInterval,
-                       StartupMode startupMode,
                        boolean useMetrics) throws Exception
        {
                super(
                                sourceContext,
-                               assignedPartitions,
-                               restoredSnapshotState,
+                               assignedPartitionsWithInitialOffsets,
                                watermarksPeriodic,
                                watermarksPunctuated,
                                runtimeContext.getProcessingTimeService(),
                                
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
                                runtimeContext.getUserCodeClassLoader(),
-                               startupMode,
                                useMetrics);
 
                this.deserializer = checkNotNull(deserializer);
@@ -122,7 +118,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, 
TopicAndPartition> {
                this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
 
                // initially, all these partitions are not assigned to a 
specific broker connection
-               for (KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
+               for (KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitionStates()) {
                        unassignedPartitionsQueue.add(partition);
                }
        }
@@ -146,43 +142,32 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, 
TopicAndPartition> {
                PeriodicOffsetCommitter periodicCommitter = null;
                try {
 
-                       // if we're not restored from a checkpoint, all 
partitions will not have their offset set;
-                       // depending on the configured startup mode, 
accordingly set the starting offsets
-                       if (!isRestored) {
-                               switch (startupMode) {
-                                       case EARLIEST:
-                                               for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
-                                                       
partition.setOffset(OffsetRequest.EarliestTime());
-                                               }
-                                               break;
-                                       case LATEST:
-                                               for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
-                                                       
partition.setOffset(OffsetRequest.LatestTime());
-                                               }
-                                               break;
-                                       default:
-                                       case GROUP_OFFSETS:
-                                               List<KafkaTopicPartition> 
partitions = new ArrayList<>(subscribedPartitions().length);
-                                               for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
-                                                       
partitions.add(partition.getKafkaTopicPartition());
-                                               }
-
-                                               Map<KafkaTopicPartition, Long> 
zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitions);
-                                               for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
-                                                       Long offset = 
zkOffsets.get(partition.getKafkaTopicPartition());
-                                                       if (offset != null) {
-                                                               // the 
committed offset in ZK represents the next record to process,
-                                                               // so we 
subtract it by 1 to correctly represent internal state
-                                                               
partition.setOffset(offset - 1);
-                                                       } else {
-                                                               // if we can't 
find an offset for a partition in ZK when using GROUP_OFFSETS,
-                                                               // we default 
to "auto.offset.reset" like the Kafka high-level consumer
-                                                               LOG.warn("No 
group offset can be found for partition {} in Zookeeper;" +
-                                                                       " 
resetting starting offset to 'auto.offset.reset'", partition);
-
-                                                               
partition.setOffset(invalidOffsetBehavior);
-                                                       }
-                                               }
+                       // offsets in the state may still be placeholder 
sentinel values if we are starting fresh, or the
+                       // checkpoint / savepoint state we were restored with 
had not completely been replaced with actual offset
+                       // values yet; replace those with actual offsets, 
according to what the sentinel value represent.
+                       for (KafkaTopicPartitionState<TopicAndPartition> 
partition : subscribedPartitionStates()) {
+                               if (partition.getOffset() == 
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
+                                       // this will be replaced by an actual 
offset in SimpleConsumerThread
+                                       
partition.setOffset(OffsetRequest.EarliestTime());
+                               } else if (partition.getOffset() == 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
+                                       // this will be replaced by an actual 
offset in SimpleConsumerThread
+                                       
partition.setOffset(OffsetRequest.LatestTime());
+                               } else if (partition.getOffset() == 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+                                       Long committedOffset = 
zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition());
+                                       if (committedOffset != null) {
+                                               // the committed offset in ZK 
represents the next record to process,
+                                               // so we subtract it by 1 to 
correctly represent internal state
+                                               
partition.setOffset(committedOffset - 1);
+                                       } else {
+                                               // if we can't find an offset 
for a partition in ZK when using GROUP_OFFSETS,
+                                               // we default to 
"auto.offset.reset" like the Kafka high-level consumer
+                                               LOG.warn("No group offset can 
be found for partition {} in Zookeeper;" +
+                                                       " resetting starting 
offset to 'auto.offset.reset'", partition);
+
+                                               
partition.setOffset(invalidOffsetBehavior);
+                                       }
+                               } else {
+                                       // the partition already has a specific 
start offset and is ready to be consumed
                                }
                        }
 
@@ -191,7 +176,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, 
TopicAndPartition> {
                                LOG.info("Starting periodic offset committer, 
with commit interval of {}ms", autoCommitInterval);
 
                                periodicCommitter = new 
PeriodicOffsetCommitter(zookeeperOffsetHandler, 
-                                               subscribedPartitions(), 
errorHandler, autoCommitInterval);
+                                               subscribedPartitionStates(), 
errorHandler, autoCommitInterval);
                                periodicCommitter.setName("Periodic Kafka 
partition offset committer");
                                periodicCommitter.setDaemon(true);
                                periodicCommitter.start();
@@ -388,7 +373,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, 
TopicAndPartition> {
                }
 
                // Set committed offsets in topic partition state
-               KafkaTopicPartitionState<TopicAndPartition>[] partitions = 
subscribedPartitions();
+               KafkaTopicPartitionState<TopicAndPartition>[] partitions = 
subscribedPartitionStates();
                for (KafkaTopicPartitionState<TopicAndPartition> partition : 
partitions) {
                        Long offset = 
offsets.get(partition.getKafkaTopicPartition());
                        if (offset != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index 8f2ef09..cec980f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -30,8 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -96,22 +94,12 @@ public class ZookeeperOffsetHandler {
        }
 
        /**
-        * @param partitions The partitions to read offsets for.
+        * @param partition The partition to read offset for.
         * @return The mapping from partition to offset.
         * @throws Exception This method forwards exceptions.
         */
-       public Map<KafkaTopicPartition, Long> 
getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception {
-               Map<KafkaTopicPartition, Long> ret = new 
HashMap<>(partitions.size());
-               for (KafkaTopicPartition tp : partitions) {
-                       Long offset = getOffsetFromZooKeeper(curatorClient, 
groupId, tp.getTopic(), tp.getPartition());
-
-                       if (offset != null) {
-                               LOG.info("Offset for TopicPartition {}:{} was 
set to {} in ZooKeeper. Seeking fetcher to that position.",
-                                               tp.getTopic(), 
tp.getPartition(), offset);
-                               ret.put(tp, offset);
-                       }
-               }
-               return ret;
+       public Long getCommittedOffset(KafkaTopicPartition partition) throws 
Exception {
+               return getOffsetFromZooKeeper(curatorClient, groupId, 
partition.getTopic(), partition.getPartition());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 9a61b91..c7236a2 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -171,8 +171,7 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
        @Override
        protected AbstractFetcher<T, ?> createFetcher(
                        SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> thisSubtaskPartitions,
-                       HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
+                       Map<KafkaTopicPartition, Long> 
assignedPartitionsWithInitialOffsets,
                        SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
                        SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
                        StreamingRuntimeContext runtimeContext) throws 
Exception {
@@ -181,8 +180,7 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
 
                return new Kafka09Fetcher<>(
                                sourceContext,
-                               thisSubtaskPartitions,
-                               restoredSnapshotState,
+                               assignedPartitionsWithInitialOffsets,
                                watermarksPeriodic,
                                watermarksPunctuated,
                                runtimeContext.getProcessingTimeService(),
@@ -194,7 +192,6 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
                                deserializer,
                                properties,
                                pollTimeout,
-                               startupMode,
                                useMetrics);
                
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index b7c9bc2..c389486 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -22,7 +22,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
@@ -71,8 +70,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> {
 
        public Kafka09Fetcher(
                        SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> assignedPartitions,
-                       HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
+                       Map<KafkaTopicPartition, Long> 
assignedPartitionsWithInitialOffsets,
                        SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
                        SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
                        ProcessingTimeService processingTimeProvider,
@@ -84,19 +82,16 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> {
                        KeyedDeserializationSchema<T> deserializer,
                        Properties kafkaProperties,
                        long pollTimeout,
-                       StartupMode startupMode,
                        boolean useMetrics) throws Exception
        {
                super(
                                sourceContext,
-                               assignedPartitions,
-                               restoredSnapshotState,
+                               assignedPartitionsWithInitialOffsets,
                                watermarksPeriodic,
                                watermarksPunctuated,
                                processingTimeProvider,
                                autoWatermarkInterval,
                                userCodeClassLoader,
-                               startupMode,
                                useMetrics);
 
                this.deserializer = deserializer;
@@ -114,13 +109,11 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> {
                                LOG,
                                handover,
                                kafkaProperties,
-                               subscribedPartitions(),
+                               subscribedPartitionStates(),
                                kafkaMetricGroup,
                                createCallBridge(),
                                getFetcherName() + " for " + 
taskNameWithSubtasks,
                                pollTimeout,
-                               startupMode,
-                               isRestored,
                                useMetrics);
        }
 
@@ -142,7 +135,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> {
                                final ConsumerRecords<byte[], byte[]> records = 
handover.pollNext();
 
                                // get the records for each topic partition
-                               for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitions()) {
+                               for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitionStates()) {
 
                                        List<ConsumerRecord<byte[], byte[]>> 
partitionRecords =
                                                        
records.records(partition.getKafkaPartitionHandle());
@@ -226,7 +219,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> {
 
        @Override
        public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> 
offsets) throws Exception {
-               KafkaTopicPartitionState<TopicPartition>[] partitions = 
subscribedPartitions();
+               KafkaTopicPartitionState<TopicPartition>[] partitions = 
subscribedPartitionStates();
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>(partitions.length);
 
                for (KafkaTopicPartitionState<TopicPartition> partition : 
partitions) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
index a97b3cf..37ba34c 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
@@ -39,16 +39,12 @@ public class KafkaConsumerCallBridge {
                consumer.assign(topicPartitions);
        }
 
-       public void seekPartitionsToBeginning(KafkaConsumer<?, ?> consumer, 
List<TopicPartition> partitions) {
-               for (TopicPartition partition : partitions) {
-                       consumer.seekToBeginning(partition);
-               }
+       public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, 
TopicPartition partition) {
+               consumer.seekToBeginning(partition);
        }
 
-       public void seekPartitionsToEnd(KafkaConsumer<?, ?> consumer, 
List<TopicPartition> partitions) {
-               for (TopicPartition partition : partitions) {
-                       consumer.seekToEnd(partition);
-               }
+       public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, 
TopicPartition partition) {
+               consumer.seekToEnd(partition);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index 03fe2c6..cbe1551 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.connectors.kafka.internal;
 
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -69,7 +69,7 @@ public class KafkaConsumerThread extends Thread {
        private final Properties kafkaProperties;
 
        /** The partitions that this consumer reads from */ 
-       private final KafkaTopicPartitionState<TopicPartition>[] 
subscribedPartitions;
+       private final KafkaTopicPartitionState<TopicPartition>[] 
subscribedPartitionStates;
 
        /** We get this from the outside to publish metrics. **/
        private final MetricGroup kafkaMetricGroup;
@@ -80,12 +80,6 @@ public class KafkaConsumerThread extends Thread {
        /** The maximum number of milliseconds to wait for a fetch batch */
        private final long pollTimeout;
 
-       /** The configured startup mode (relevant only if we're restored from 
checkpoint / savepoint) */
-       private final StartupMode startupMode;
-
-       /** Flag whether or not we're restored from checkpoint / savepoint */
-       private final boolean isRestored;
-
        /** Flag whether to add Kafka's metrics to the Flink metrics */
        private final boolean useMetrics;
 
@@ -103,13 +97,11 @@ public class KafkaConsumerThread extends Thread {
                        Logger log,
                        Handover handover,
                        Properties kafkaProperties,
-                       KafkaTopicPartitionState<TopicPartition>[] 
subscribedPartitions,
+                       KafkaTopicPartitionState<TopicPartition>[] 
subscribedPartitionStates,
                        MetricGroup kafkaMetricGroup,
                        KafkaConsumerCallBridge consumerCallBridge,
                        String threadName,
                        long pollTimeout,
-                       StartupMode startupMode,
-                       boolean isRestored,
                        boolean useMetrics) {
 
                super(threadName);
@@ -120,21 +112,8 @@ public class KafkaConsumerThread extends Thread {
                this.kafkaProperties = checkNotNull(kafkaProperties);
                this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
                this.consumerCallBridge = checkNotNull(consumerCallBridge);
-               this.startupMode = checkNotNull(startupMode);
-
-               this.subscribedPartitions = checkNotNull(subscribedPartitions);
-               this.isRestored = isRestored;
-
-               // if we are restoring from a checkpoint / savepoint, all
-               // subscribed partitions' state should have defined offsets
-               if (isRestored) {
-                       for (KafkaTopicPartitionState<TopicPartition> 
subscribedPartition : subscribedPartitions) {
-                               if (!subscribedPartition.isOffsetDefined()) {
-                                       throw new 
IllegalArgumentException("Restoring from a checkpoint / savepoint, but found a 
" +
-                                               "partition state " + 
subscribedPartition + " that does not have a defined offset.");
-                               }
-                       }
-               }
+
+               this.subscribedPartitionStates = 
checkNotNull(subscribedPartitionStates);
 
                this.pollTimeout = pollTimeout;
                this.useMetrics = useMetrics;
@@ -173,7 +152,7 @@ public class KafkaConsumerThread extends Thread {
                        final OffsetCommitCallback offsetCommitCallback = new 
CommitCallback();
 
                        // tell the consumer which partitions to work with
-                       consumerCallBridge.assignPartitions(consumer, 
convertKafkaPartitions(subscribedPartitions));
+                       consumerCallBridge.assignPartitions(consumer, 
convertKafkaPartitions(subscribedPartitionStates));
 
                        // register Kafka's very own metrics in Flink's metric 
reporters
                        if (useMetrics) {
@@ -195,39 +174,23 @@ public class KafkaConsumerThread extends Thread {
                                return;
                        }
 
-                       if (isRestored) {
-                               for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitions) {
-                                       log.info("Partition {} has restored 
initial offsets {} from checkpoint / savepoint; seeking the consumer " +
-                                               "to position {}", 
partition.getKafkaPartitionHandle(), partition.getOffset(), 
partition.getOffset() + 1);
-
-                                       
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
-                               }
-                       } else {
-                               List<TopicPartition> partitionList = 
convertKafkaPartitions(subscribedPartitions);
-
-                               // fetch offsets from Kafka, depending on the 
configured startup mode
-                               switch (startupMode) {
-                                       case EARLIEST:
-                                               log.info("Setting starting 
point as earliest offset for partitions {}", partitionList);
-
-                                               
consumerCallBridge.seekPartitionsToBeginning(consumer, partitionList);
-                                               break;
-                                       case LATEST:
-                                               log.info("Setting starting 
point as latest offset for partitions {}", partitionList);
-
-                                               
consumerCallBridge.seekPartitionsToEnd(consumer, partitionList);
-                                               break;
-                                       default:
-                                       case GROUP_OFFSETS:
-                                               log.info("Using group offsets 
in Kafka of group.id {} as starting point for partitions {}",
-                                                       
kafkaProperties.getProperty("group.id"), partitionList);
-                               }
+                       // offsets in the state may still be placeholder 
sentinel values if we are starting fresh, or the
+                       // checkpoint / savepoint state we were restored with 
had not completely been replaced with actual offset
+                       // values yet; replace those with actual offsets, 
according to what the sentinel value represent.
+                       for (KafkaTopicPartitionState<TopicPartition> partition 
: subscribedPartitionStates) {
+                               if (partition.getOffset() == 
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
+                                       
consumerCallBridge.seekPartitionToBeginning(consumer, 
partition.getKafkaPartitionHandle());
+                                       
partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
+                               } else if (partition.getOffset() == 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
+                                       
consumerCallBridge.seekPartitionToEnd(consumer, 
partition.getKafkaPartitionHandle());
+                                       
partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
+                               } else if (partition.getOffset() == 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+                                       // the KafkaConsumer by default will 
automatically seek the consumer position
+                                       // to the committed group offset, so we 
do not need to do it.
 
-                               // on startup, all partition states will not 
have defined offsets;
-                               // set the initial states with the offsets 
fetched from Kafka
-                               for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitions) {
-                                       // the fetched offset represents the 
next record to process, so we need to subtract it by 1
                                        
partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1);
+                               } else {
+                                       
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index abd75cc..49144e6 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -24,10 +24,10 @@ import 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.internal.Handover;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -118,13 +118,13 @@ public class Kafka09FetcherTest {
 
                @SuppressWarnings("unchecked")
                SourceContext<String> sourceContext = mock(SourceContext.class);
-               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
+               Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+                       Collections.singletonMap(new 
KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
                final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
                                sourceContext,
-                               topics,
-                               null, /* no restored state */
+                               partitionsWithInitialOffsets,
                                null, /* periodic watermark extractor */
                                null, /* punctuated watermark extractor */
                                new TestProcessingTimeService(),
@@ -136,7 +136,6 @@ public class Kafka09FetcherTest {
                                schema,
                                new Properties(),
                                0L,
-                               StartupMode.GROUP_OFFSETS,
                                false);
 
                // ----- run the fetcher -----
@@ -256,13 +255,13 @@ public class Kafka09FetcherTest {
 
                @SuppressWarnings("unchecked")
                SourceContext<String> sourceContext = mock(SourceContext.class);
-               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
+               Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+                       Collections.singletonMap(new 
KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
                final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
                                sourceContext,
-                               topics,
-                               null, /* no restored state */
+                               partitionsWithInitialOffsets,
                                null, /* periodic watermark extractor */
                                null, /* punctuated watermark extractor */
                                new TestProcessingTimeService(),
@@ -274,7 +273,6 @@ public class Kafka09FetcherTest {
                                schema,
                                new Properties(),
                                0L,
-                               StartupMode.GROUP_OFFSETS,
                                false);
 
                // ----- run the fetcher -----
@@ -372,13 +370,13 @@ public class Kafka09FetcherTest {
                // ----- build a fetcher -----
 
                BlockingSourceContext<String> sourceContext = new 
BlockingSourceContext<>();
-               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition(topic, partition));
+               Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+                       Collections.singletonMap(new KafkaTopicPartition(topic, 
partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
 
                final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
                                sourceContext,
-                               topics,
-                               null, /* no restored state */
+                               partitionsWithInitialOffsets,
                                null, /* periodic watermark extractor */
                                null, /* punctuated watermark extractor */
                                new TestProcessingTimeService(),
@@ -390,7 +388,6 @@ public class Kafka09FetcherTest {
                                schema,
                                new Properties(),
                                0L,
-                               StartupMode.GROUP_OFFSETS,
                                false);
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 1121d1b..144ede8 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -40,15 +40,11 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -90,8 +86,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
        protected final KeyedDeserializationSchema<T> deserializer;
 
-       /** The set of topic partitions that the source will read */
-       private List<KafkaTopicPartition> subscribedPartitions;
+       /** The set of topic partitions that the source will read, with their 
initial offsets to start reading from */
+       private Map<KafkaTopicPartition, Long> 
subscribedPartitionsToStartOffsets;
        
        /** Optional timestamp extractor / watermark generator that will be run 
per Kafka partition,
         * to exploit per-partition timestamp characteristics.
@@ -138,17 +134,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                this.deserializer = checkNotNull(deserializer, 
"valueDeserializer");
        }
 
-       /**
-        * This method must be called from the subclasses, to set the list of 
all subscribed partitions
-        * that this consumer will fetch from (across all subtasks).
-        * 
-        * @param allSubscribedPartitions The list of all partitions that all 
subtasks together should fetch from.
-        */
-       protected void setSubscribedPartitions(List<KafkaTopicPartition> 
allSubscribedPartitions) {
-               checkNotNull(allSubscribedPartitions);
-               this.subscribedPartitions = 
Collections.unmodifiableList(allSubscribedPartitions);
-       }
-
        // 
------------------------------------------------------------------------
        //  Configuration
        // 
------------------------------------------------------------------------
@@ -263,17 +248,67 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        // 
------------------------------------------------------------------------
 
        @Override
+       public void open(Configuration configuration) {
+               List<KafkaTopicPartition> kafkaTopicPartitions = 
getKafkaPartitions(topics);
+
+               subscribedPartitionsToStartOffsets = new 
HashMap<>(kafkaTopicPartitions.size());
+
+               if (kafkaTopicPartitions != null) {
+                       if (restoredState != null) {
+                               for (KafkaTopicPartition kafkaTopicPartition : 
kafkaTopicPartitions) {
+                                       if 
(restoredState.containsKey(kafkaTopicPartition)) {
+                                               
subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, 
restoredState.get(kafkaTopicPartition));
+                                       }
+                               }
+
+                               LOG.info("Consumer subtask {} will start 
reading {} partitions with offsets in restored state: {}",
+                                       
getRuntimeContext().getIndexOfThisSubtask(), 
subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
+                       } else {
+                               initializeSubscribedPartitionsToStartOffsets(
+                                       subscribedPartitionsToStartOffsets,
+                                       kafkaTopicPartitions,
+                                       
getRuntimeContext().getIndexOfThisSubtask(),
+                                       
getRuntimeContext().getNumberOfParallelSubtasks(),
+                                       startupMode);
+
+                               if (subscribedPartitionsToStartOffsets.size() 
!= 0) {
+                                       switch (startupMode) {
+                                               case EARLIEST:
+                                                       LOG.info("Consumer 
subtask {} will start reading the following {} partitions from the earliest 
offsets: {}",
+                                                               
getRuntimeContext().getIndexOfThisSubtask(),
+                                                               
subscribedPartitionsToStartOffsets.size(),
+                                                               
subscribedPartitionsToStartOffsets.keySet());
+                                                       break;
+                                               case LATEST:
+                                                       LOG.info("Consumer 
subtask {} will start reading the following {} partitions from the latest 
offsets: {}",
+                                                               
getRuntimeContext().getIndexOfThisSubtask(),
+                                                               
subscribedPartitionsToStartOffsets.size(),
+                                                               
subscribedPartitionsToStartOffsets.keySet());
+                                                       break;
+                                               default:
+                                               case GROUP_OFFSETS:
+                                                       LOG.info("Consumer 
subtask {} will start reading the following {} partitions from the committed 
group offsets in Kafka: {}",
+                                                               
getRuntimeContext().getIndexOfThisSubtask(),
+                                                               
subscribedPartitionsToStartOffsets.size(),
+                                                               
subscribedPartitionsToStartOffsets.keySet());
+                                       }
+                               }
+                       }
+               }
+       }
+
+       @Override
        public void run(SourceContext<T> sourceContext) throws Exception {
-               if (subscribedPartitions == null) {
+               if (subscribedPartitionsToStartOffsets == null) {
                        throw new Exception("The partitions were not set for 
the consumer");
                }
 
                // we need only do work, if we actually have partitions assigned
-               if (!subscribedPartitions.isEmpty()) {
+               if (!subscribedPartitionsToStartOffsets.isEmpty()) {
 
                        // create the fetcher that will communicate with the 
Kafka brokers
                        final AbstractFetcher<T, ?> fetcher = createFetcher(
-                                       sourceContext, subscribedPartitions, 
restoredState,
+                                       sourceContext, 
subscribedPartitionsToStartOffsets,
                                        periodicWatermarkAssigner, 
punctuatedWatermarkAssigner,
                                        (StreamingRuntimeContext) 
getRuntimeContext());
 
@@ -327,15 +362,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        }
 
        @Override
-       public void open(Configuration configuration) {
-               List<KafkaTopicPartition> kafkaTopicPartitions = 
getKafkaPartitions(topics);
-
-               if (kafkaTopicPartitions != null) {
-                       assignTopicPartitions(kafkaTopicPartitions);
-               }
-       }
-
-       @Override
        public void close() throws Exception {
                // pretty much the same logic as cancelling
                try {
@@ -386,18 +412,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                        if (fetcher == null) {
                                // the fetcher has not yet been initialized, 
which means we need to return the
                                // originally restored offsets or the assigned 
partitions
-
-                               if (restoredState != null) {
-
-                                       for (Map.Entry<KafkaTopicPartition, 
Long> kafkaTopicPartitionLongEntry : restoredState.entrySet()) {
-                                               offsetsStateForCheckpoint.add(
-                                                               
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), 
kafkaTopicPartitionLongEntry.getValue()));
-                                       }
-                               } else if (subscribedPartitions != null) {
-                                       for (KafkaTopicPartition 
subscribedPartition : subscribedPartitions) {
-                                               offsetsStateForCheckpoint.add(
-                                                               
Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET));
-                                       }
+                               for (Map.Entry<KafkaTopicPartition, Long> 
subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
+                                       
offsetsStateForCheckpoint.add(Tuple2.of(subscribedPartition.getKey(), 
subscribedPartition.getValue()));
                                }
 
                                // the map cannot be asynchronously updated, 
because only one checkpoint call can happen
@@ -493,7 +509,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
         * data, and emits it into the data streams.
         * 
         * @param sourceContext The source context to emit data to.
-        * @param thisSubtaskPartitions The set of partitions that this subtask 
should handle.
+        * @param subscribedPartitionsToStartOffsets The set of partitions that 
this subtask should handle, with their start offsets.
         * @param watermarksPeriodic Optional, a serialized timestamp extractor 
/ periodic watermark generator.
         * @param watermarksPunctuated Optional, a serialized timestamp 
extractor / punctuated watermark generator.
         * @param runtimeContext The task's runtime context.
@@ -504,8 +520,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
         */
        protected abstract AbstractFetcher<T, ?> createFetcher(
                        SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> thisSubtaskPartitions,
-                       HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
+                       Map<KafkaTopicPartition, Long> 
subscribedPartitionsToStartOffsets,
                        SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
                        SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
                        StreamingRuntimeContext runtimeContext) throws 
Exception;
@@ -525,60 +540,33 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private void assignTopicPartitions(List<KafkaTopicPartition> 
kafkaTopicPartitions) {
-               subscribedPartitions = new ArrayList<>();
-
-               if (restoredState != null) {
-                       for (KafkaTopicPartition kafkaTopicPartition : 
kafkaTopicPartitions) {
-                               if 
(restoredState.containsKey(kafkaTopicPartition)) {
-                                       
subscribedPartitions.add(kafkaTopicPartition);
-                               }
-                       }
-               } else {
-                       Collections.sort(kafkaTopicPartitions, new 
Comparator<KafkaTopicPartition>() {
-                               @Override
-                               public int compare(KafkaTopicPartition o1, 
KafkaTopicPartition o2) {
-                                       int topicComparison = 
o1.getTopic().compareTo(o2.getTopic());
-
-                                       if (topicComparison == 0) {
-                                               return o1.getPartition() - 
o2.getPartition();
-                                       } else {
-                                               return topicComparison;
-                                       }
-                               }
-                       });
-
-                       for (int i = 
getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i 
+= getRuntimeContext().getNumberOfParallelSubtasks()) {
-                               
subscribedPartitions.add(kafkaTopicPartitions.get(i));
-                       }
-               }
-       }
-
        /**
-        * Selects which of the given partitions should be handled by a 
specific consumer,
-        * given a certain number of consumers.
-        * 
-        * @param allPartitions The partitions to select from
-        * @param numConsumers The number of consumers
-        * @param consumerIndex The index of the specific consumer
-        * 
-        * @return The sublist of partitions to be handled by that consumer.
+        * Initializes {@link 
FlinkKafkaConsumerBase#subscribedPartitionsToStartOffsets} with appropriate
+        * values. The method decides which partitions this consumer instance 
should subscribe to, and also
+        * sets the initial offset each subscribed partition should be started 
from based on the configured startup mode.
+        *
+        * @param subscribedPartitionsToStartOffsets to 
subscribedPartitionsToStartOffsets to initialize
+        * @param kafkaTopicPartitions the complete list of all Kafka partitions
+        * @param indexOfThisSubtask the index of this consumer instance
+        * @param numParallelSubtasks total number of parallel consumer 
instances
+        * @param startupMode the configured startup mode for the consumer
+        *
+        * Note: This method is also exposed for testing.
         */
-       protected static List<KafkaTopicPartition> assignPartitions(
-                       List<KafkaTopicPartition> allPartitions,
-                       int numConsumers, int consumerIndex) {
-               final List<KafkaTopicPartition> thisSubtaskPartitions = new 
ArrayList<>(
-                               allPartitions.size() / numConsumers + 1);
-
-               for (int i = 0; i < allPartitions.size(); i++) {
-                       if (i % numConsumers == consumerIndex) {
-                               thisSubtaskPartitions.add(allPartitions.get(i));
+       protected static void initializeSubscribedPartitionsToStartOffsets(
+                       Map<KafkaTopicPartition, Long> 
subscribedPartitionsToStartOffsets,
+                       List<KafkaTopicPartition> kafkaTopicPartitions,
+                       int indexOfThisSubtask,
+                       int numParallelSubtasks,
+                       StartupMode startupMode) {
+
+               for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
+                       if (i % numParallelSubtasks == indexOfThisSubtask) {
+                               
subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), 
startupMode.getStateSentinel());
                        }
                }
-               
-               return thisSubtaskPartitions;
        }
-       
+
        /**
         * Logs the partition information in INFO level.
         * 
@@ -607,8 +595,17 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        }
 
        @VisibleForTesting
-       List<KafkaTopicPartition> getSubscribedPartitions() {
-               return subscribedPartitions;
+       void setSubscribedPartitions(List<KafkaTopicPartition> 
allSubscribedPartitions) {
+               checkNotNull(allSubscribedPartitions);
+               this.subscribedPartitionsToStartOffsets = new HashMap<>();
+               for (KafkaTopicPartition partition : allSubscribedPartitions) {
+                       this.subscribedPartitionsToStartOffsets.put(partition, 
null);
+               }
+       }
+
+       @VisibleForTesting
+       Map<KafkaTopicPartition, Long> getSubscribedPartitionsToStartOffsets() {
+               return subscribedPartitionsToStartOffsets;
        }
 
        @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
index 331c1a6..f796e62 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
@@ -16,18 +16,30 @@
  */
 package org.apache.flink.streaming.connectors.kafka.config;
 
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+
 /**
  * Startup modes for the Kafka Consumer.
  */
 public enum StartupMode {
 
        /** Start from committed offsets in ZK / Kafka brokers of a specific 
consumer group (default) */
-       GROUP_OFFSETS,
+       GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),
 
        /** Start from the earliest offset possible */
-       EARLIEST,
+       EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
 
        /** Start from the latest offset */
-       LATEST
-       
+       LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+
+       /** The sentinel offset value corresponding to this startup mode */
+       private long stateSentinel;
+
+       StartupMode(long stateSentinel) {
+               this.stateSentinel = stateSentinel;
+       }
+
+       public long getStateSentinel() {
+               return stateSentinel;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index b27e996..e021881 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -26,12 +26,10 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,20 +61,14 @@ public abstract class AbstractFetcher<T, KPH> {
        protected final Object checkpointLock;
 
        /** All partitions (and their state) that this fetcher is subscribed to 
*/
-       private final KafkaTopicPartitionState<KPH>[] allPartitions;
+       private final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates;
 
        /** The mode describing whether the fetcher also generates timestamps 
and watermarks */
        protected final int timestampWatermarkMode;
 
-       /** The startup mode for the consumer (only relevant if the consumer 
wasn't restored) */
-       protected final StartupMode startupMode;
-
        /** Flag whether to register metrics for the fetcher */
        protected final boolean useMetrics;
 
-       /** Flag whether or not the consumer state was restored from a 
checkpoint / savepoint */
-       protected final boolean isRestored;
-
        /** Only relevant for punctuated watermarks: The current cross 
partition watermark */
        private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
 
@@ -84,19 +76,16 @@ public abstract class AbstractFetcher<T, KPH> {
        
        protected AbstractFetcher(
                        SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> assignedPartitions,
-                       HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
+                       Map<KafkaTopicPartition, Long> 
assignedPartitionsWithInitialOffsets,
                        SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
                        SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
                        ProcessingTimeService processingTimeProvider,
                        long autoWatermarkInterval,
                        ClassLoader userCodeClassLoader,
-                       StartupMode startupMode,
                        boolean useMetrics) throws Exception
        {
                this.sourceContext = checkNotNull(sourceContext);
                this.checkpointLock = sourceContext.getCheckpointLock();
-               this.startupMode = checkNotNull(startupMode);
                this.useMetrics = useMetrics;
                
                // figure out what we watermark mode we will be using
@@ -115,30 +104,25 @@ public abstract class AbstractFetcher<T, KPH> {
                                throw new IllegalArgumentException("Cannot have 
both periodic and punctuated watermarks");
                        }
                }
-               
+
                // create our partition state according to the 
timestamp/watermark mode 
-               this.allPartitions = initializePartitions(
-                               assignedPartitions,
+               this.subscribedPartitionStates = 
initializeSubscribedPartitionStates(
+                               assignedPartitionsWithInitialOffsets,
                                timestampWatermarkMode,
                                watermarksPeriodic, watermarksPunctuated,
                                userCodeClassLoader);
 
-               if (restoredSnapshotState != null) {
-                       for (KafkaTopicPartitionState<?> partition : 
allPartitions) {
-                               Long offset = 
restoredSnapshotState.get(partition.getKafkaTopicPartition());
-                               if (offset != null) {
-                                       partition.setOffset(offset);
-                               }
+               // check that all partition states have a defined offset
+               for (KafkaTopicPartitionState partitionState : 
subscribedPartitionStates) {
+                       if (!partitionState.isOffsetDefined()) {
+                               throw new IllegalArgumentException("The fetcher 
was assigned partitions with undefined initial offsets.");
                        }
-                       this.isRestored = true;
-               } else {
-                       this.isRestored = false;
                }
                
                // if we have periodic watermarks, kick off the interval 
scheduler
                if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
                        KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] 
parts = 
-                                       
(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
+                                       
(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) 
subscribedPartitionStates;
                        
                        PeriodicWatermarkEmitter periodicEmitter = 
                                        new PeriodicWatermarkEmitter(parts, 
sourceContext, processingTimeProvider, autoWatermarkInterval);
@@ -155,8 +139,8 @@ public abstract class AbstractFetcher<T, KPH> {
         *
         * @return All subscribed partitions.
         */
-       protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
-               return allPartitions;
+       protected final KafkaTopicPartitionState<KPH>[] 
subscribedPartitionStates() {
+               return subscribedPartitionStates;
        }
 
        // 
------------------------------------------------------------------------
@@ -207,8 +191,8 @@ public abstract class AbstractFetcher<T, KPH> {
                // this method assumes that the checkpoint lock is held
                assert Thread.holdsLock(checkpointLock);
 
-               HashMap<KafkaTopicPartition, Long> state = new 
HashMap<>(allPartitions.length);
-               for (KafkaTopicPartitionState<?> partition : 
subscribedPartitions()) {
+               HashMap<KafkaTopicPartition, Long> state = new 
HashMap<>(subscribedPartitionStates.length);
+               for (KafkaTopicPartitionState<?> partition : 
subscribedPartitionStates()) {
                        state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
                }
                return state;
@@ -343,7 +327,7 @@ public abstract class AbstractFetcher<T, KPH> {
                if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
                        long newMin = Long.MAX_VALUE;
 
-                       for (KafkaTopicPartitionState<?> state : allPartitions) 
{
+                       for (KafkaTopicPartitionState<?> state : 
subscribedPartitionStates) {
                                @SuppressWarnings("unchecked")
                                final 
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
                                                
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
@@ -371,8 +355,8 @@ public abstract class AbstractFetcher<T, KPH> {
         * Utility method that takes the topic partitions and creates the topic 
partition state
         * holders. If a watermark generator per partition exists, this will 
also initialize those.
         */
-       private KafkaTopicPartitionState<KPH>[] initializePartitions(
-                       List<KafkaTopicPartition> assignedPartitions,
+       private KafkaTopicPartitionState<KPH>[] 
initializeSubscribedPartitionStates(
+                       Map<KafkaTopicPartition, Long> 
assignedPartitionsToInitialOffsets,
                        int timestampWatermarkMode,
                        SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
                        SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
@@ -384,13 +368,16 @@ public abstract class AbstractFetcher<T, KPH> {
                        case NO_TIMESTAMPS_WATERMARKS: {
                                @SuppressWarnings("unchecked")
                                KafkaTopicPartitionState<KPH>[] partitions =
-                                               
(KafkaTopicPartitionState<KPH>[]) new 
KafkaTopicPartitionState<?>[assignedPartitions.size()];
+                                               
(KafkaTopicPartitionState<KPH>[]) new 
KafkaTopicPartitionState<?>[assignedPartitionsToInitialOffsets.size()];
 
                                int pos = 0;
-                               for (KafkaTopicPartition partition : 
assignedPartitions) {
+                               for (Map.Entry<KafkaTopicPartition, Long> 
partition : assignedPartitionsToInitialOffsets.entrySet()) {
                                        // create the kafka version specific 
partition handle
-                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
-                                       partitions[pos++] = new 
KafkaTopicPartitionState<>(partition, kafkaHandle);
+                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition.getKey());
+                                       partitions[pos] = new 
KafkaTopicPartitionState<>(partition.getKey(), kafkaHandle);
+                                       
partitions[pos].setOffset(partition.getValue());
+
+                                       pos++;
                                }
 
                                return partitions;
@@ -400,17 +387,20 @@ public abstract class AbstractFetcher<T, KPH> {
                                @SuppressWarnings("unchecked")
                                
KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
                                                
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
-                                                               new 
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
+                                                               new 
KafkaTopicPartitionStateWithPeriodicWatermarks<?, 
?>[assignedPartitionsToInitialOffsets.size()];
 
                                int pos = 0;
-                               for (KafkaTopicPartition partition : 
assignedPartitions) {
-                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
+                               for (Map.Entry<KafkaTopicPartition, Long> 
partition : assignedPartitionsToInitialOffsets.entrySet()) {
+                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition.getKey());
 
                                        AssignerWithPeriodicWatermarks<T> 
assignerInstance =
                                                        
watermarksPeriodic.deserializeValue(userCodeClassLoader);
                                        
-                                       partitions[pos++] = new 
KafkaTopicPartitionStateWithPeriodicWatermarks<>(
-                                                       partition, kafkaHandle, 
assignerInstance);
+                                       partitions[pos] = new 
KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+                                                       partition.getKey(), 
kafkaHandle, assignerInstance);
+                                       
partitions[pos].setOffset(partition.getValue());
+
+                                       pos++;
                                }
 
                                return partitions;
@@ -420,17 +410,20 @@ public abstract class AbstractFetcher<T, KPH> {
                                @SuppressWarnings("unchecked")
                                
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
                                                
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
-                                                               new 
KafkaTopicPartitionStateWithPunctuatedWatermarks<?, 
?>[assignedPartitions.size()];
+                                                               new 
KafkaTopicPartitionStateWithPunctuatedWatermarks<?, 
?>[assignedPartitionsToInitialOffsets.size()];
 
                                int pos = 0;
-                               for (KafkaTopicPartition partition : 
assignedPartitions) {
-                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
+                               for (Map.Entry<KafkaTopicPartition, Long> 
partition : assignedPartitionsToInitialOffsets.entrySet()) {
+                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition.getKey());
 
                                        AssignerWithPunctuatedWatermarks<T> 
assignerInstance =
                                                        
watermarksPunctuated.deserializeValue(userCodeClassLoader);
 
-                                       partitions[pos++] = new 
KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
-                                                       partition, kafkaHandle, 
assignerInstance);
+                                       partitions[pos] = new 
KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+                                                       partition.getKey(), 
kafkaHandle, assignerInstance);
+                                       
partitions[pos].setOffset(partition.getValue());
+
+                                       pos++;
                                }
 
                                return partitions;
@@ -452,7 +445,7 @@ public abstract class AbstractFetcher<T, KPH> {
                // add current offsets to gage
                MetricGroup currentOffsets = 
metricGroup.addGroup("current-offsets");
                MetricGroup committedOffsets = 
metricGroup.addGroup("committed-offsets");
-               for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) {
+               for (KafkaTopicPartitionState<?> ktp: 
subscribedPartitionStates()) {
                        currentOffsets.gauge(ktp.getTopic() + "-" + 
ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
                        committedOffsets.gauge(ktp.getTopic() + "-" + 
ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
index 7cb5f46..adfbf79 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -28,10 +28,6 @@ package 
org.apache.flink.streaming.connectors.kafka.internals;
  * @param <KPH> The type of the Kafka partition descriptor, which varies 
across Kafka versions.
  */
 public class KafkaTopicPartitionState<KPH> {
-
-       /** Magic number to define an unset offset. Negative offsets are not 
used by Kafka (invalid),
-        * and we pick a number that is probably (hopefully) not used by Kafka 
as a magic number for anything else. */
-       public static final long OFFSET_NOT_SET = -915623761776L;
        
        // 
------------------------------------------------------------------------
 
@@ -52,8 +48,8 @@ public class KafkaTopicPartitionState<KPH> {
        public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH 
kafkaPartitionHandle) {
                this.partition = partition;
                this.kafkaPartitionHandle = kafkaPartitionHandle;
-               this.offset = OFFSET_NOT_SET;
-               this.committedOffset = OFFSET_NOT_SET;
+               this.offset = KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
+               this.committedOffset = 
KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
        }
 
        // 
------------------------------------------------------------------------
@@ -96,7 +92,7 @@ public class KafkaTopicPartitionState<KPH> {
        }
 
        public final boolean isOffsetDefined() {
-               return offset != OFFSET_NOT_SET;
+               return offset != 
KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET;
        }
 
        public final void setCommittedOffset(long offset) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
new file mode 100644
index 0000000..153a326
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * Magic values used to represent special offset states before partitions are 
actually read.
+ *
+ * The values are all negative. Negative offsets are not used by Kafka 
(invalid), so we
+ * pick a number that is probably (hopefully) not used by Kafka as a magic 
number for anything else.
+ */
+public class KafkaTopicPartitionStateSentinel {
+
+       /** Magic number that defines an unset offset. */
+       public static final long OFFSET_NOT_SET = -915623761776L;
+
+       /**
+        * Magic number that defines the partition should start from the 
earliest offset.
+        *
+        * This is used as a placeholder so that the actual earliest offset can 
be evaluated lazily
+        * when the partition will actually start to be read by the consumer.
+        */
+       public static final long EARLIEST_OFFSET = -915623761775L;
+
+       /**
+        * Magic number that defines the partition should start from the latest 
offset.
+        *
+        * This is used as a placeholder so that the actual latest offset can 
be evaluated lazily
+        * when the partition will actually start to be read by the consumer.
+        */
+       public static final long LATEST_OFFSET = -915623761774L;
+
+       /**
+        * Magic number that defines the partition should start from its 
committed group offset in Kafka.
+        *
+        * This is used as a placeholder so that the actual committed group 
offset can be evaluated lazily
+        * when the partition will actually start to be read by the consumer.
+        */
+       public static final long GROUP_OFFSET = -915623761773L;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 38a3ce8..20411e1 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
@@ -34,6 +35,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
 
@@ -68,8 +70,8 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                testHarness.open();
 
                // assert that no partitions were found and is empty
-               Assert.assertTrue(consumerFunction.getSubscribedPartitions() != 
null);
-               
Assert.assertTrue(consumerFunction.getSubscribedPartitions().isEmpty());
+               
Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != 
null);
+               
Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
 
                // assert that no state was restored
                Assert.assertTrue(consumerFunction.getRestoredState() == null);
@@ -101,10 +103,16 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                        
getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
                testHarness.open();
 
+               // the expected state in 
"kafka-consumer-migration-test-flink1.1-snapshot-empty-state";
+               // since the state is empty, the consumer should reflect on the 
startup mode to determine start offsets.
+               final HashMap<KafkaTopicPartition, Long> 
expectedSubscribedPartitionsWithStartOffsets = new HashMap<>();
+               expectedSubscribedPartitionsWithStartOffsets.put(new 
KafkaTopicPartition("abc", 13), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+               expectedSubscribedPartitionsWithStartOffsets.put(new 
KafkaTopicPartition("def", 7), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
                // assert that there are partitions and is identical to 
expected list
-               Assert.assertTrue(consumerFunction.getSubscribedPartitions() != 
null);
-               
Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
-               
Assert.assertTrue(consumerFunction.getSubscribedPartitions().equals(partitions));
+               
Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != 
null);
+               
Assert.assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+               
Assert.assertEquals(expectedSubscribedPartitionsWithStartOffsets, 
consumerFunction.getSubscribedPartitionsToStartOffsets());
 
                // assert that no state was restored
                Assert.assertTrue(consumerFunction.getRestoredState() == null);
@@ -136,16 +144,18 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                        
getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot"));
                testHarness.open();
 
-               // assert that there are partitions and is identical to 
expected list
-               Assert.assertTrue(consumerFunction.getSubscribedPartitions() != 
null);
-               
Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
-               Assert.assertEquals(partitions, 
consumerFunction.getSubscribedPartitions());
-
                // the expected state in 
"kafka-consumer-migration-test-flink1.1-snapshot"
                final HashMap<KafkaTopicPartition, Long> expectedState = new 
HashMap<>();
                expectedState.put(new KafkaTopicPartition("abc", 13), 16768L);
                expectedState.put(new KafkaTopicPartition("def", 7), 
987654321L);
 
+               // assert that there are partitions and is identical to 
expected list
+               
Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != 
null);
+               
Assert.assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+
+               // on restore, subscribedPartitionsToStartOffsets should be 
identical to the restored state
+               Assert.assertEquals(expectedState, 
consumerFunction.getSubscribedPartitionsToStartOffsets());
+
                // assert that state is correctly restored from legacy 
checkpoint
                Assert.assertTrue(consumerFunction.getRestoredState() != null);
                Assert.assertEquals(expectedState, 
consumerFunction.getRestoredState());
@@ -179,8 +189,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                @Override
                protected AbstractFetcher<T, ?> createFetcher(
                                SourceContext<T> sourceContext,
-                               List<KafkaTopicPartition> thisSubtaskPartitions,
-                               HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
+                               Map<KafkaTopicPartition, Long> 
thisSubtaskPartitionsWithStartOffsets,
                                
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
                                
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
                                StreamingRuntimeContext runtimeContext) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 980a025..e6ea63f 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
@@ -42,13 +43,7 @@ import org.mockito.stubbing.Answer;
 
 import java.io.Serializable;
 import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -137,6 +132,8 @@ public class FlinkKafkaConsumerBaseTest {
 
                consumer.initializeState(initializationContext);
 
+               consumer.open(new Configuration());
+
                consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(17, 17));
 
                // ensure that the list was cleared and refilled. while this is 
an implementation detail, we use it here
@@ -177,6 +174,8 @@ public class FlinkKafkaConsumerBaseTest {
 
                consumer.initializeState(initializationContext);
 
+               consumer.open(new Configuration());
+
                consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(17, 17));
 
                assertFalse(listState.get().iterator().hasNext());
@@ -364,15 +363,10 @@ public class FlinkKafkaConsumerBaseTest {
                @SuppressWarnings("unchecked")
                protected AbstractFetcher<T, ?> createFetcher(
                                SourceContext<T> sourceContext,
-                               List<KafkaTopicPartition> thisSubtaskPartitions,
-                               HashMap<KafkaTopicPartition, Long> 
restoredSnapshotState,
+                               Map<KafkaTopicPartition, Long> 
thisSubtaskPartitionsWithStartOffsets,
                                
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
                                
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
                                StreamingRuntimeContext runtimeContext) throws 
Exception {
-                       if (restoredSnapshotState != null) {
-                               Assert.fail("Trying to restore offsets even 
though there was no restore state.");
-                               return null;
-                       }
                        return mock(AbstractFetcher.class);
                }
 


Reply via email to