http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
new file mode 100644
index 0000000..99c5d69
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A special version of the per-kafka-partition-state that additionally holds
+ * a periodic watermark generator (and timestamp extractor) per partition.
+ * 
+ * @param <T> The type of records handled by the watermark generator
+ * @param <KPH> The type of the Kafka partition descriptor, which varies 
across Kafka versions.
+ */
+public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> 
extends KafkaTopicPartitionState<KPH> {
+       
+       /** The timestamp assigner and watermark generator for the partition */
+       private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks;
+       
+       /** The last watermark timestamp generated by this partition */
+       private long partitionWatermark;
+
+       // 
------------------------------------------------------------------------
+       
+       public KafkaTopicPartitionStateWithPeriodicWatermarks(
+                       KafkaTopicPartition partition, KPH kafkaPartitionHandle,
+                       AssignerWithPeriodicWatermarks<T> 
timestampsAndWatermarks)
+       {
+               super(partition, kafkaPartitionHandle);
+               
+               this.timestampsAndWatermarks = timestampsAndWatermarks;
+               this.partitionWatermark = Long.MIN_VALUE;
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       public long getTimestampForRecord (T record) {
+               return timestampsAndWatermarks.extractTimestamp(record, 
Long.MIN_VALUE);
+       }
+       
+       public long getCurrentWatermarkTimestamp() {
+               Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
+               if (wm != null) {
+                       partitionWatermark = Math.max(partitionWatermark, 
wm.getTimestamp());
+               }
+               return partitionWatermark;
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public String toString() {
+               return "KafkaTopicPartitionStateWithPeriodicWatermarks: 
partition=" + getKafkaTopicPartition()
+                               + ", offset=" + getOffset() + ", watermark=" + 
partitionWatermark;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
new file mode 100644
index 0000000..b265990
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import javax.annotation.Nullable;
+
+/**
+ * A special version of the per-kafka-partition-state that additionally holds
+ * a periodic watermark generator (and timestamp extractor) per partition.
+ * 
+ * <p>This class is not thread safe, but it gives volatile access to the 
current
+ * partition watermark ({@link #getCurrentPartitionWatermark()}).
+ * 
+ * @param <T> The type of records handled by the watermark generator
+ * @param <KPH> The type of the Kafka partition descriptor, which varies 
across Kafka versions
+ */
+public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> 
extends KafkaTopicPartitionState<KPH> {
+       
+       /** The timestamp assigner and watermark generator for the partition */
+       private final AssignerWithPunctuatedWatermarks<T> 
timestampsAndWatermarks;
+       
+       /** The last watermark timestamp generated by this partition */
+       private volatile long partitionWatermark;
+
+       // 
------------------------------------------------------------------------
+       
+       public KafkaTopicPartitionStateWithPunctuatedWatermarks(
+                       KafkaTopicPartition partition, KPH kafkaPartitionHandle,
+                       AssignerWithPunctuatedWatermarks<T> 
timestampsAndWatermarks)
+       {
+               super(partition, kafkaPartitionHandle);
+               
+               this.timestampsAndWatermarks = timestampsAndWatermarks;
+               this.partitionWatermark = Long.MIN_VALUE;
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       public long getTimestampForRecord(T record) {
+               return timestampsAndWatermarks.extractTimestamp(record, 
Long.MIN_VALUE);
+       }
+
+       @Nullable
+       public Watermark checkAndGetNewWatermark(T record, long timestamp) {
+               Watermark mark = 
timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp);
+               if (mark != null && mark.getTimestamp() > partitionWatermark) {
+                       partitionWatermark = mark.getTimestamp();
+                       return mark;
+               }
+               else {
+                       return null;
+               }
+       }
+       
+       public long getCurrentPartitionWatermark() {
+               return partitionWatermark;
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public String toString() {
+               return "KafkaTopicPartitionStateWithPunctuatedWatermarks: 
partition=" + getKafkaTopicPartition()
+                               + ", offset=" + getOffset() + ", watermark=" + 
partitionWatermark;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
index 038f414..37e2ef6 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
 
+package org.apache.flink.streaming.connectors.kafka.partitioner;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
index 1be6b00..bda90bd 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kafka.util;
 
+package org.apache.flink.streaming.connectors.kafka.util;
 
 import java.util.Properties;
 
@@ -38,10 +38,9 @@ public class KafkaUtils {
                                        "Entered value='" + 
config.getProperty(key) + "'. Default value='" + defaultValue + "'");
                }
        }
-
-       public static void checkArgument(boolean arg) {
-               if(!arg) {
-                       throw new IllegalArgumentException();
-               }
-       }
+       
+       // 
------------------------------------------------------------------------
+       
+       /** Private default constructor to prevent instantiation */
+       private KafkaUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
new file mode 100644
index 0000000..f4ef995
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class FlinkKafkaConsumerBaseTest {
+
+       /**
+        * Tests that not both types of timestamp extractors / watermark 
generators can be used.
+        */
+       @Test
+       public void testEitherWatermarkExtractor() {
+               try {
+                       new 
DummyFlinkKafkaConsumer<>().setPeriodicWatermarkEmitter(null);
+                       fail();
+               } catch (NullPointerException ignored) {}
+
+               try {
+                       new 
DummyFlinkKafkaConsumer<>().setPunctuatedWatermarkEmitter(null);
+                       fail();
+               } catch (NullPointerException ignored) {}
+               
+               @SuppressWarnings("unchecked")
+               final AssignerWithPeriodicWatermarks<String> periodicAssigner = 
mock(AssignerWithPeriodicWatermarks.class);
+               @SuppressWarnings("unchecked")
+               final AssignerWithPunctuatedWatermarks<String> 
punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
+               
+               DummyFlinkKafkaConsumer<String> c1 = new 
DummyFlinkKafkaConsumer<>();
+               c1.setPeriodicWatermarkEmitter(periodicAssigner);
+               try {
+                       c1.setPunctuatedWatermarkEmitter(punctuatedAssigner);
+                       fail();
+               } catch (IllegalStateException ignored) {}
+
+               DummyFlinkKafkaConsumer<String> c2 = new 
DummyFlinkKafkaConsumer<>();
+               c2.setPunctuatedWatermarkEmitter(punctuatedAssigner);
+               try {
+                       c2.setPeriodicWatermarkEmitter(periodicAssigner);
+                       fail();
+               } catch (IllegalStateException ignored) {}
+       }
+
+       /**
+        * Tests that no checkpoints happen when the fetcher is not running.
+        */
+       @Test
+       public void ignoreCheckpointWhenNotRunning() throws Exception {
+               @SuppressWarnings("unchecked")
+               final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
+
+               FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, 
new LinkedMap(), false);
+               assertNull(consumer.snapshotState(17L, 23L));
+               consumer.notifyCheckpointComplete(66L);
+       }
+
+       /**
+        * Tests that no checkpoints happen when the fetcher is not running.
+        */
+       @Test
+       public void checkRestoredCheckpointWhenFetcherNotReady() throws 
Exception {
+               HashMap<KafkaTopicPartition, Long> restoreState = new 
HashMap<>();
+               restoreState.put(new KafkaTopicPartition("abc", 13), 16768L);
+               restoreState.put(new KafkaTopicPartition("def", 7), 987654321L);
+
+               FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new 
LinkedMap(), true);
+               consumer.restoreState(restoreState);
+               
+               assertEquals(restoreState, consumer.snapshotState(17L, 23L));
+       }
+
+       /**
+        * Tests that no checkpoints happen when the fetcher is not running.
+        */
+       @Test
+       public void checkRestoredNullCheckpointWhenFetcherNotReady() throws 
Exception {
+               FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new 
LinkedMap(), true);
+               assertNull(consumer.snapshotState(17L, 23L));
+       }
+       
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testSnapshotState() throws Exception {
+               final HashMap<KafkaTopicPartition, Long> state1 = new 
HashMap<>();
+               state1.put(new KafkaTopicPartition("abc", 13), 16768L);
+               state1.put(new KafkaTopicPartition("def", 7), 987654321L);
+
+               final HashMap<KafkaTopicPartition, Long> state2 = new 
HashMap<>();
+               state2.put(new KafkaTopicPartition("abc", 13), 16770L);
+               state2.put(new KafkaTopicPartition("def", 7), 987654329L);
+
+               final HashMap<KafkaTopicPartition, Long> state3 = new 
HashMap<>();
+               state2.put(new KafkaTopicPartition("abc", 13), 16780L);
+               state2.put(new KafkaTopicPartition("def", 7), 987654377L);
+               
+               final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
+               when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, 
state3);
+                       
+               final LinkedMap pendingCheckpoints = new LinkedMap();
+       
+               FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, 
pendingCheckpoints, true);
+               assertEquals(0, pendingCheckpoints.size());
+               
+               // checkpoint 1
+               HashMap<KafkaTopicPartition, Long> snapshot1 = 
consumer.snapshotState(138L, 19L);
+               assertEquals(state1, snapshot1);
+               assertEquals(1, pendingCheckpoints.size());
+               assertEquals(state1, pendingCheckpoints.get(138L));
+
+               // checkpoint 2
+               HashMap<KafkaTopicPartition, Long> snapshot2 = 
consumer.snapshotState(140L, 1578L);
+               assertEquals(state2, snapshot2);
+               assertEquals(2, pendingCheckpoints.size());
+               assertEquals(state2, pendingCheckpoints.get(140L));
+               
+               // ack checkpoint 1
+               consumer.notifyCheckpointComplete(138L);
+               assertEquals(1, pendingCheckpoints.size());
+               assertTrue(pendingCheckpoints.containsKey(140L));
+
+               // checkpoint 3
+               HashMap<KafkaTopicPartition, Long> snapshot3 = 
consumer.snapshotState(141L, 1578L);
+               assertEquals(state3, snapshot3);
+               assertEquals(2, pendingCheckpoints.size());
+               assertEquals(state3, pendingCheckpoints.get(141L));
+               
+               // ack checkpoint 3, subsumes number 2
+               consumer.notifyCheckpointComplete(141L);
+               assertEquals(0, pendingCheckpoints.size());
+
+
+               consumer.notifyCheckpointComplete(666); // invalid checkpoint
+               assertEquals(0, pendingCheckpoints.size());
+
+               // create 500 snapshots
+               for (int i = 100; i < 600; i++) {
+                       consumer.snapshotState(i, 15 * i);
+               }
+               
assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, 
pendingCheckpoints.size());
+
+               // commit only the second last
+               consumer.notifyCheckpointComplete(598);
+               assertEquals(1, pendingCheckpoints.size());
+
+               // access invalid checkpoint
+               consumer.notifyCheckpointComplete(590);
+
+               // and the last
+               consumer.notifyCheckpointComplete(599);
+               assertEquals(0, pendingCheckpoints.size());
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static <T> FlinkKafkaConsumerBase<T> getConsumer(
+                       AbstractFetcher<T, ?> fetcher, LinkedMap 
pendingCheckpoints, boolean running) throws Exception
+       {
+               FlinkKafkaConsumerBase<T> consumer = new 
DummyFlinkKafkaConsumer<>();
+
+               Field fetcherField = 
FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
+               fetcherField.setAccessible(true);
+               fetcherField.set(consumer, fetcher);
+
+               Field mapField = 
FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
+               mapField.setAccessible(true);
+               mapField.set(consumer, pendingCheckpoints);
+
+               Field runningField = 
FlinkKafkaConsumerBase.class.getDeclaredField("running");
+               runningField.setAccessible(true);
+               runningField.set(consumer, running);
+
+               return consumer;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static final class DummyFlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
+               private static final long serialVersionUID = 1L;
+
+               @SuppressWarnings("unchecked")
+               public DummyFlinkKafkaConsumer() {
+                       super((KeyedDeserializationSchema<T>) 
mock(KeyedDeserializationSchema.class));
+               }
+
+               @Override
+               protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> 
sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, 
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, 
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, 
StreamingRuntimeContext runtimeContext) throws Exception {
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index e86d51a..9beed22 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
-import org.apache.kafka.common.Node;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -32,30 +30,27 @@ import java.util.Set;
 
 import static org.junit.Assert.*;
 
-
 /**
  * Tests that the partition assignment is deterministic and stable.
  */
 public class KafkaConsumerPartitionAssignmentTest {
 
-       private final Node fake = new Node(1337, "localhost", 1337);
-
        @Test
        public void testPartitionsEqualConsumers() {
                try {
-                       List<KafkaTopicPartitionLeader> inPartitions = new 
ArrayList<>();
-                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 4), fake));
-                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 52), fake));
-                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 17), fake));
-                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 1), fake));
+                       List<KafkaTopicPartition> inPartitions = Arrays.asList(
+                                       new KafkaTopicPartition("test-topic", 
4),
+                                       new KafkaTopicPartition("test-topic", 
52),
+                                       new KafkaTopicPartition("test-topic", 
17),
+                                       new KafkaTopicPartition("test-topic", 
1));
 
                        for (int i = 0; i < inPartitions.size(); i++) {
-                               List<KafkaTopicPartitionLeader> parts = 
FlinkKafkaConsumerBase.assignPartitions(
-                                               inPartitions, 
inPartitions.size(), i);
+                               List<KafkaTopicPartition> parts = 
+                                               
FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i);
 
                                assertNotNull(parts);
                                assertEquals(1, parts.size());
-                               assertTrue(contains(inPartitions, 
parts.get(0).getTopicPartition().getPartition()));
+                               assertTrue(contains(inPartitions, 
parts.get(0).getPartition()));
                        }
                }
                catch (Exception e) {
@@ -64,9 +59,9 @@ public class KafkaConsumerPartitionAssignmentTest {
                }
        }
 
-       private boolean contains(List<KafkaTopicPartitionLeader> inPartitions, 
int partition) {
-               for (KafkaTopicPartitionLeader ktp: inPartitions) {
-                       if (ktp.getTopicPartition().getPartition() == 
partition) {
+       private boolean contains(List<KafkaTopicPartition> inPartitions, int 
partition) {
+               for (KafkaTopicPartition ktp : inPartitions) {
+                       if (ktp.getPartition() == partition) {
                                return true;
                        }
                }
@@ -78,11 +73,11 @@ public class KafkaConsumerPartitionAssignmentTest {
                try {
                        final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 
31, 127, 14};
 
-                       final List<KafkaTopicPartitionLeader> partitions = new 
ArrayList<>();
-                       final Set<KafkaTopicPartitionLeader> allPartitions = 
new HashSet<>();
+                       final List<KafkaTopicPartition> partitions = new 
ArrayList<>();
+                       final Set<KafkaTopicPartition> allPartitions = new 
HashSet<>();
 
                        for (int p : partitionIDs) {
-                               KafkaTopicPartitionLeader part = new 
KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
+                               KafkaTopicPartition part = new 
KafkaTopicPartition("test-topic", p);
                                partitions.add(part);
                                allPartitions.add(part);
                        }
@@ -92,13 +87,14 @@ public class KafkaConsumerPartitionAssignmentTest {
                        final int maxPartitionsPerConsumer = partitions.size() 
/ numConsumers + 1;
 
                        for (int i = 0; i < numConsumers; i++) {
-                               List<KafkaTopicPartitionLeader> parts = 
FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
+                               List<KafkaTopicPartition> parts = 
+                                               
FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
 
                                assertNotNull(parts);
                                assertTrue(parts.size() >= 
minPartitionsPerConsumer);
                                assertTrue(parts.size() <= 
maxPartitionsPerConsumer);
 
-                               for (KafkaTopicPartitionLeader p : parts) {
+                               for (KafkaTopicPartition p : parts) {
                                        // check that the element was actually 
contained
                                        assertTrue(allPartitions.remove(p));
                                }
@@ -116,24 +112,24 @@ public class KafkaConsumerPartitionAssignmentTest {
        @Test
        public void testPartitionsFewerThanConsumers() {
                try {
-                       List<KafkaTopicPartitionLeader> inPartitions = new 
ArrayList<>();
-                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 4), fake));
-                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 52), fake));
-                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 17), fake));
-                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 1), fake));
+                       List<KafkaTopicPartition> inPartitions = Arrays.asList(
+                                       new KafkaTopicPartition("test-topic", 
4),
+                                       new KafkaTopicPartition("test-topic", 
52),
+                                       new KafkaTopicPartition("test-topic", 
17),
+                                       new KafkaTopicPartition("test-topic", 
1));
 
-                       final Set<KafkaTopicPartitionLeader> allPartitions = 
new HashSet<>();
+                       final Set<KafkaTopicPartition> allPartitions = new 
HashSet<>();
                        allPartitions.addAll(inPartitions);
 
                        final int numConsumers = 2 * inPartitions.size() + 3;
 
                        for (int i = 0; i < numConsumers; i++) {
-                               List<KafkaTopicPartitionLeader> parts = 
FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
+                               List<KafkaTopicPartition> parts = 
FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
 
                                assertNotNull(parts);
                                assertTrue(parts.size() <= 1);
 
-                               for (KafkaTopicPartitionLeader p : parts) {
+                               for (KafkaTopicPartition p : parts) {
                                        // check that the element was actually 
contained
                                        assertTrue(allPartitions.remove(p));
                                }
@@ -151,12 +147,12 @@ public class KafkaConsumerPartitionAssignmentTest {
        @Test
        public void testAssignEmptyPartitions() {
                try {
-                       List<KafkaTopicPartitionLeader> ep = new ArrayList<>();
-                       List<KafkaTopicPartitionLeader> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
+                       List<KafkaTopicPartition> ep = new ArrayList<>();
+                       List<KafkaTopicPartition> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
                        assertNotNull(parts1);
                        assertTrue(parts1.isEmpty());
 
-                       List<KafkaTopicPartitionLeader> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
+                       List<KafkaTopicPartition> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
                        assertNotNull(parts2);
                        assertTrue(parts2.isEmpty());
                }
@@ -170,17 +166,17 @@ public class KafkaConsumerPartitionAssignmentTest {
        public void testGrowingPartitionsRemainsStable() {
                try {
                        final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 
42, 31, 127, 14};
-                       List<KafkaTopicPartitionLeader> newPartitions = new 
ArrayList<>();
+                       List<KafkaTopicPartition> newPartitions = new 
ArrayList<>();
 
                        for (int p : newPartitionIDs) {
-                               KafkaTopicPartitionLeader part = new 
KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
+                               KafkaTopicPartition part = new 
KafkaTopicPartition("test-topic", p);
                                newPartitions.add(part);
                        }
 
-                       List<KafkaTopicPartitionLeader> initialPartitions = 
newPartitions.subList(0, 7);
+                       List<KafkaTopicPartition> initialPartitions = 
newPartitions.subList(0, 7);
 
-                       final Set<KafkaTopicPartitionLeader> allNewPartitions = 
new HashSet<>(newPartitions);
-                       final Set<KafkaTopicPartitionLeader> 
allInitialPartitions = new HashSet<>(initialPartitions);
+                       final Set<KafkaTopicPartition> allNewPartitions = new 
HashSet<>(newPartitions);
+                       final Set<KafkaTopicPartition> allInitialPartitions = 
new HashSet<>(initialPartitions);
 
                        final int numConsumers = 3;
                        final int minInitialPartitionsPerConsumer = 
initialPartitions.size() / numConsumers;
@@ -188,11 +184,11 @@ public class KafkaConsumerPartitionAssignmentTest {
                        final int minNewPartitionsPerConsumer = 
newPartitions.size() / numConsumers;
                        final int maxNewPartitionsPerConsumer = 
newPartitions.size() / numConsumers + 1;
 
-                       List<KafkaTopicPartitionLeader> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(
+                       List<KafkaTopicPartition> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(
                                        initialPartitions, numConsumers, 0);
-                       List<KafkaTopicPartitionLeader> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(
+                       List<KafkaTopicPartition> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(
                                        initialPartitions, numConsumers, 1);
-                       List<KafkaTopicPartitionLeader> parts3 = 
FlinkKafkaConsumerBase.assignPartitions(
+                       List<KafkaTopicPartition> parts3 = 
FlinkKafkaConsumerBase.assignPartitions(
                                        initialPartitions, numConsumers, 2);
 
                        assertNotNull(parts1);
@@ -206,15 +202,15 @@ public class KafkaConsumerPartitionAssignmentTest {
                        assertTrue(parts3.size() >= 
minInitialPartitionsPerConsumer);
                        assertTrue(parts3.size() <= 
maxInitialPartitionsPerConsumer);
 
-                       for (KafkaTopicPartitionLeader p : parts1) {
+                       for (KafkaTopicPartition p : parts1) {
                                // check that the element was actually contained
                                assertTrue(allInitialPartitions.remove(p));
                        }
-                       for (KafkaTopicPartitionLeader p : parts2) {
+                       for (KafkaTopicPartition p : parts2) {
                                // check that the element was actually contained
                                assertTrue(allInitialPartitions.remove(p));
                        }
-                       for (KafkaTopicPartitionLeader p : parts3) {
+                       for (KafkaTopicPartition p : parts3) {
                                // check that the element was actually contained
                                assertTrue(allInitialPartitions.remove(p));
                        }
@@ -224,11 +220,11 @@ public class KafkaConsumerPartitionAssignmentTest {
 
                        // grow the set of partitions and distribute anew
 
-                       List<KafkaTopicPartitionLeader> parts1new = 
FlinkKafkaConsumerBase.assignPartitions(
+                       List<KafkaTopicPartition> parts1new = 
FlinkKafkaConsumerBase.assignPartitions(
                                        newPartitions, numConsumers, 0);
-                       List<KafkaTopicPartitionLeader> parts2new = 
FlinkKafkaConsumerBase.assignPartitions(
+                       List<KafkaTopicPartition> parts2new = 
FlinkKafkaConsumerBase.assignPartitions(
                                        newPartitions, numConsumers, 1);
-                       List<KafkaTopicPartitionLeader> parts3new = 
FlinkKafkaConsumerBase.assignPartitions(
+                       List<KafkaTopicPartition> parts3new = 
FlinkKafkaConsumerBase.assignPartitions(
                                        newPartitions, numConsumers, 2);
 
                        // new partitions must include all old partitions
@@ -248,15 +244,15 @@ public class KafkaConsumerPartitionAssignmentTest {
                        assertTrue(parts3new.size() >= 
minNewPartitionsPerConsumer);
                        assertTrue(parts3new.size() <= 
maxNewPartitionsPerConsumer);
 
-                       for (KafkaTopicPartitionLeader p : parts1new) {
+                       for (KafkaTopicPartition p : parts1new) {
                                // check that the element was actually contained
                                assertTrue(allNewPartitions.remove(p));
                        }
-                       for (KafkaTopicPartitionLeader p : parts2new) {
+                       for (KafkaTopicPartition p : parts2new) {
                                // check that the element was actually contained
                                assertTrue(allNewPartitions.remove(p));
                        }
-                       for (KafkaTopicPartitionLeader p : parts3new) {
+                       for (KafkaTopicPartition p : parts3new) {
                                // check that the element was actually contained
                                assertTrue(allNewPartitions.remove(p));
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 340950b..aa5344b 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -26,8 +26,6 @@ import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
 import kafka.server.KafkaServer;
 
-import org.apache.commons.collections.map.LinkedMap;
-
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -66,12 +64,10 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
@@ -98,7 +94,6 @@ import org.junit.Rule;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -176,70 +171,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        }
                }
        }
-       /**
-        * Test that validates that checkpointing and checkpoint notification 
works properly
-        */
-       public void runCheckpointingTest() throws Exception {
-               createTestTopic("testCheckpointing", 1, 1);
-
-               FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer("testCheckpointing", new SimpleStringSchema(), 
standardProps);
-               Field pendingCheckpointsField = 
FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
-               pendingCheckpointsField.setAccessible(true);
-               LinkedMap pendingCheckpoints = (LinkedMap) 
pendingCheckpointsField.get(source);
-
-               Assert.assertEquals(0, pendingCheckpoints.size());
-               source.setRuntimeContext(new MockRuntimeContext(1, 0));
-
-               final HashMap<KafkaTopicPartition, Long> initialOffsets = new 
HashMap<>();
-               initialOffsets.put(new KafkaTopicPartition("testCheckpointing", 
0), 1337L);
-
-               // first restore
-               source.restoreState(initialOffsets);
-
-               // then open
-               source.open(new Configuration());
-               HashMap<KafkaTopicPartition, Long> state1 = 
source.snapshotState(1, 15);
-
-               assertEquals(initialOffsets, state1);
-
-               HashMap<KafkaTopicPartition, Long> state2 = 
source.snapshotState(2, 30);
-               Assert.assertEquals(initialOffsets, state2);
-
-               Assert.assertEquals(2, pendingCheckpoints.size());
-
-               source.notifyCheckpointComplete(1);
-               Assert.assertEquals(1, pendingCheckpoints.size());
-
-               source.notifyCheckpointComplete(2);
-               Assert.assertEquals(0, pendingCheckpoints.size());
-
-               source.notifyCheckpointComplete(666); // invalid checkpoint
-               Assert.assertEquals(0, pendingCheckpoints.size());
-
-               // create 500 snapshots
-               for (int i = 100; i < 600; i++) {
-                       source.snapshotState(i, 15 * i);
-               }
-               
Assert.assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, 
pendingCheckpoints.size());
-
-               // commit only the second last
-               source.notifyCheckpointComplete(598);
-               Assert.assertEquals(1, pendingCheckpoints.size());
-
-               // access invalid checkpoint
-               source.notifyCheckpointComplete(590);
-
-               // and the last
-               source.notifyCheckpointComplete(599);
-               Assert.assertEquals(0, pendingCheckpoints.size());
-
-               source.close();
-
-               deleteTestTopic("testCheckpointing");
-       }
-
-
-
+       
        /**
         * Ensure Kafka is working on both producer and consumer side.
         * This executes a job that contains two Flink pipelines.
@@ -409,7 +341,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
                env.enableCheckpointing(500);
                env.setParallelism(parallelism);
-               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 
1000));
+               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 
0));
                env.getConfig().disableSysoutLogging();
 
                FlinkKafkaConsumerBase<Integer> kafkaSource = 
kafkaServer.getConsumer(topic, schema, standardProps);
@@ -454,7 +386,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
                env.enableCheckpointing(500);
                env.setParallelism(parallelism);
-               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 
1000));
+               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 
0));
                env.getConfig().disableSysoutLogging();
 
                FlinkKafkaConsumerBase<Integer> kafkaSource = 
kafkaServer.getConsumer(topic, schema, standardProps);
@@ -499,7 +431,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
                env.enableCheckpointing(500);
                env.setParallelism(parallelism);
-               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
1000));
+               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 
0));
                env.getConfig().disableSysoutLogging();
                env.setBufferTimeout(0);
 
@@ -562,7 +494,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                runnerThread.start();
 
                // wait a bit before canceling
-               Thread.sleep(8000);
+               Thread.sleep(2000);
 
                Throwable failueCause = jobError.get();
                if(failueCause != null) {
@@ -634,10 +566,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                runnerThread.start();
 
                // wait a bit before canceling
-               Thread.sleep(8000);
+               Thread.sleep(2000);
 
                Throwable failueCause = error.get();
-               if(failueCause != null) {
+               if (failueCause != null) {
                        failueCause.printStackTrace();
                        Assert.fail("Test failed prematurely with: " + 
failueCause.getMessage());
                }
@@ -709,7 +641,8 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                final int NUM_ELEMENTS = 20;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-
+               env.getConfig().disableSysoutLogging();
+               
                // create topics with content
                final List<String> topics = new ArrayList<>();
                for (int i = 0; i < NUM_TOPICS; i++) {
@@ -745,6 +678,8 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                // run second job consuming from multiple topics
                env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env.getConfig().disableSysoutLogging();
+               
                stream = env.addSource(kafkaServer.getConsumer(topics, schema, 
standardProps));
 
                stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, 
String>, Integer>() {
@@ -1453,50 +1388,50 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
        /////////////                   Testing the Kafka consumer with embeded 
watermark generation functionality                      ///////////////
 
-       @RetryOnException(times=0, 
exception=kafka.common.NotLeaderForPartitionException.class)
-       public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean 
emptyPartition) throws Exception {
-
-               final String topic1 = "wmExtractorTopic1_" + 
UUID.randomUUID().toString();
-               final String topic2 = "wmExtractorTopic2_" + 
UUID.randomUUID().toString();
-
-               final Map<String, Boolean> topics = new HashMap<>();
-               topics.put(topic1, false);
-               topics.put(topic2, emptyPartition);
-
-               final int noOfTopcis = topics.size();
-               final int partitionsPerTopic = 1;
-               final int elementsPerPartition = 100 + 1;
-
-               final int totalElements = emptyPartition ?
-                       partitionsPerTopic * elementsPerPartition :
-                       noOfTopcis * partitionsPerTopic * elementsPerPartition;
-
-               createTestTopic(topic1, partitionsPerTopic, 1);
-               createTestTopic(topic2, partitionsPerTopic, 1);
-
-               final StreamExecutionEnvironment env =
-                       
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               env.setParallelism(partitionsPerTopic);
-               env.setRestartStrategy(RestartStrategies.noRestart()); // fail 
immediately
-               env.getConfig().disableSysoutLogging();
-
-               TypeInformation<Tuple2<Long, Integer>> longIntType = 
TypeInfoParser.parse("Tuple2<Long, Integer>");
-
-               Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
-               producerProperties.setProperty("retries", "0");
-
-               putDataInTopics(env, producerProperties, elementsPerPartition, 
topics, longIntType);
-
-               List<String> topicTitles = new ArrayList<>(topics.keySet());
-               runPunctuatedComsumer(env, topicTitles, totalElements, 
longIntType);
-
-               executeAndCatchException(env, 
"runComsumerWithPunctuatedExplicitWMTest");
-
-               for(String topic: topicTitles) {
-                       deleteTestTopic(topic);
-               }
-       }
+//     @RetryOnException(times=0, 
exception=kafka.common.NotLeaderForPartitionException.class)
+//     public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean 
emptyPartition) throws Exception {
+//
+//             final String topic1 = "wmExtractorTopic1_" + 
UUID.randomUUID().toString();
+//             final String topic2 = "wmExtractorTopic2_" + 
UUID.randomUUID().toString();
+//
+//             final Map<String, Boolean> topics = new HashMap<>();
+//             topics.put(topic1, false);
+//             topics.put(topic2, emptyPartition);
+//
+//             final int noOfTopcis = topics.size();
+//             final int partitionsPerTopic = 1;
+//             final int elementsPerPartition = 100 + 1;
+//
+//             final int totalElements = emptyPartition ?
+//                     partitionsPerTopic * elementsPerPartition :
+//                     noOfTopcis * partitionsPerTopic * elementsPerPartition;
+//
+//             createTestTopic(topic1, partitionsPerTopic, 1);
+//             createTestTopic(topic2, partitionsPerTopic, 1);
+//
+//             final StreamExecutionEnvironment env =
+//                     
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+//             env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+//             env.setParallelism(partitionsPerTopic);
+//             env.setRestartStrategy(RestartStrategies.noRestart()); // fail 
immediately
+//             env.getConfig().disableSysoutLogging();
+//
+//             TypeInformation<Tuple2<Long, Integer>> longIntType = 
TypeInfoParser.parse("Tuple2<Long, Integer>");
+//
+//             Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+//             producerProperties.setProperty("retries", "0");
+//
+//             putDataInTopics(env, producerProperties, elementsPerPartition, 
topics, longIntType);
+//
+//             List<String> topicTitles = new ArrayList<>(topics.keySet());
+//             runPunctuatedComsumer(env, topicTitles, totalElements, 
longIntType);
+//
+//             executeAndCatchException(env, 
"runComsumerWithPunctuatedExplicitWMTest");
+//
+//             for(String topic: topicTitles) {
+//                     deleteTestTopic(topic);
+//             }
+//     }
 
        private void executeAndCatchException(StreamExecutionEnvironment env, 
String execName) throws Exception {
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index e251174..14e74f1 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -167,7 +167,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
        // 
------------------------------------------------------------------------
 
-       public static class CustomPartitioner extends KafkaPartitioner 
implements Serializable {
+       public static class CustomPartitioner extends 
KafkaPartitioner<Tuple2<Long, String>> implements Serializable {
 
                private final int expectedPartitions;
 
@@ -177,12 +177,10 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
 
                @Override
-               public int partition(Object next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions) {
-                       Tuple2<Long, String> tuple = (Tuple2<Long, String>) 
next;
-
+               public int partition(Tuple2<Long, String> next, byte[] 
serializedKey, byte[] serializedValue, int numPartitions) {
                        assertEquals(expectedPartitions, numPartitions);
 
-                       return (int) (tuple.f0 % numPartitions);
+                       return (int) (next.f0 % numPartitions);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 9f8159c..9e3c33b 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -26,16 +26,16 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,14 +44,18 @@ import java.io.Serializable;
 import java.util.Properties;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
-
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 /**
  * A class containing a special Kafka broker which has a log retention of only 
250 ms.
  * This way, we can make sure our consumer is properly handling cases where we 
run into out of offset
  * errors
  */
+@SuppressWarnings("serial")
 public class KafkaShortRetentionTestBase implements Serializable {
+       
        protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaShortRetentionTestBase.class);
+       
        private static KafkaTestEnvironment kafkaServer;
        private static Properties standardProps;
        private static ForkableFlinkMiniCluster flink;
@@ -108,7 +112,7 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
                final String topic = "auto-offset-reset-test";
 
                final int parallelism = 1;
-               final int elementsPerPartition = 50000; // with a sleep time of 
1 ms per element, test should run for 50 s
+               final int elementsPerPartition = 50000;
 
                Properties tprops = new Properties();
                tprops.setProperty("retention.ms", "250");
@@ -162,6 +166,7 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
                kafkaServer.deleteTestTopic(topic);
        }
 
+       
        private class NonContinousOffsetsDeserializationSchema implements 
KeyedDeserializationSchema<String> {
                private int numJumps;
                long nextExpected = 0;
@@ -205,12 +210,8 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
         */
        public void runFailOnAutoOffsetResetNone() throws Exception {
                final String topic = "auto-offset-reset-none-test";
-
                final int parallelism = 1;
-               final int elementsPerPartition = 50000; // with a sleep time of 
1 ms per element, test should run for 50 s
-               final int totalElements = parallelism * elementsPerPartition;
-
-
+               
                kafkaServer.createTestTopic(topic, parallelism, 1);
 
                final StreamExecutionEnvironment env =
@@ -218,8 +219,7 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
                env.setParallelism(parallelism);
                env.setRestartStrategy(RestartStrategies.noRestart()); // fail 
immediately
                env.getConfig().disableSysoutLogging();
-
-
+               
                // ----------- add consumer ----------
 
                Properties customProps = new Properties();
@@ -245,4 +245,27 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
                kafkaServer.deleteTestTopic(topic);
        }
 
+       public void runFailOnAutoOffsetResetNoneEager() throws Exception {
+               final String topic = "auto-offset-reset-none-test";
+               final int parallelism = 1;
+
+               kafkaServer.createTestTopic(topic, parallelism, 1);
+
+               // ----------- add consumer ----------
+
+               Properties customProps = new Properties();
+               customProps.putAll(standardProps);
+               customProps.setProperty("auto.offset.reset", "none"); // test 
that "none" leads to an exception
+               
+               try {
+                       kafkaServer.getConsumer(topic, new 
SimpleStringSchema(), customProps);
+                       fail("should fail with an exception");
+               }
+               catch (IllegalArgumentException e) {
+                       // expected
+                       assertTrue(e.getMessage().contains("none"));
+               }
+
+               kafkaServer.deleteTestTopic(topic);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
new file mode 100644
index 0000000..0e16263
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import static org.junit.Assert.*;
+
+public class KafkaTopicPartitionTest {
+       
+       @Test
+       public void validateUid() {
+               Field uidField;
+               try {
+                       uidField = 
KafkaTopicPartition.class.getDeclaredField("serialVersionUID");
+                       uidField.setAccessible(true);
+               }
+               catch (NoSuchFieldException e) {
+                       fail("serialVersionUID is not defined");
+                       return;
+               }
+               
+               assertTrue(Modifier.isStatic(uidField.getModifiers()));
+               assertTrue(Modifier.isFinal(uidField.getModifiers()));
+               assertTrue(Modifier.isPrivate(uidField.getModifiers()));
+               
+               assertEquals(long.class, uidField.getType());
+               
+               // the UID has to be constant to make sure old 
checkpoints/savepoints can be read 
+               try {
+                       assertEquals(722083576322742325L, 
uidField.getLong(null));
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
index e94adb5..24822ed 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -36,30 +36,39 @@ public class JobManagerCommunicationUtils {
        
        
        public static void cancelCurrentJob(ActorGateway jobManager) throws 
Exception {
+               JobStatusMessage status = null;
                
-               // find the jobID
-               Future<Object> listResponse = jobManager.ask(
-                               
JobManagerMessages.getRequestRunningJobsStatus(),
-                               askTimeout);
-
-               List<JobStatusMessage> jobs;
-               try {
-                       Object result = Await.result(listResponse, askTimeout);
-                       jobs = ((JobManagerMessages.RunningJobsStatus) 
result).getStatusMessages();
-               }
-               catch (Exception e) {
-                       throw new Exception("Could not cancel job - failed to 
retrieve running jobs from the JobManager.", e);
-               }
+               for (int i = 0; i < 200; i++) {
+                       // find the jobID
+                       Future<Object> listResponse = jobManager.ask(
+                                       
JobManagerMessages.getRequestRunningJobsStatus(),
+                                       askTimeout);
+       
+                       List<JobStatusMessage> jobs;
+                       try {
+                               Object result = Await.result(listResponse, 
askTimeout);
+                               jobs = ((JobManagerMessages.RunningJobsStatus) 
result).getStatusMessages();
+                       }
+                       catch (Exception e) {
+                               throw new Exception("Could not cancel job - 
failed to retrieve running jobs from the JobManager.", e);
+                       }
                
-               if (jobs.isEmpty()) {
-                       throw new Exception("Could not cancel job - no running 
jobs");
-               }
-               if (jobs.size() != 1) {
-                       throw new Exception("Could not cancel job - more than 
one running job.");
+                       if (jobs.isEmpty()) {
+                               // try again, fall through the loop
+                               Thread.sleep(50);
+                       }
+                       else if (jobs.size() == 1) {
+                               status = jobs.get(0);
+                       }
+                       else {
+                               throw new Exception("Could not cancel job - 
more than one running job.");
+                       }
                }
                
-               JobStatusMessage status = jobs.get(0);
-               if (status.getJobState().isTerminalState()) {
+               if (status == null) {
+                       throw new Exception("Could not cancel job - no running 
jobs");  
+               }
+               else if (status.getJobState().isTerminalState()) {
                        throw new Exception("Could not cancel job - job is not 
running any more");
                }
                

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 17e2e6f..e74eee4 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -44,6 +43,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+@SuppressWarnings("deprecation")
 public class MockRuntimeContext extends StreamingRuntimeContext {
 
        private final int numberOfParallelSubtasks;
@@ -57,15 +57,6 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
                this.indexOfThisSubtask = indexOfThisSubtask;
        }
 
-       private static class MockStreamOperator extends 
AbstractStreamOperator<Integer> {
-               private static final long serialVersionUID = 
-1153976702711944427L;
-
-               @Override
-               public ExecutionConfig getExecutionConfig() {
-                       return new ExecutionConfig();
-               }
-       }
-
        @Override
        public boolean isCheckpointingEnabled() {
                return true;
@@ -152,12 +143,12 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
        }
 
        @Override
-       public <S> OperatorState<S> getKeyValueState(String name, Class<S> 
stateType, S defaultState) {
+       public <S> org.apache.flink.api.common.state.OperatorState<S> 
getKeyValueState(String name, Class<S> stateType, S defaultState) {
                throw new UnsupportedOperationException();
        }
 
        @Override
-       public <S> OperatorState<S> getKeyValueState(String name, 
TypeInformation<S> stateType, S defaultState) {
+       public <S> org.apache.flink.api.common.state.OperatorState<S> 
getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
                throw new UnsupportedOperationException();
        }
 
@@ -175,4 +166,15 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
        public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> 
stateProperties) {
                throw new UnsupportedOperationException();
        }
+       
+       // 
------------------------------------------------------------------------
+
+       private static class MockStreamOperator extends 
AbstractStreamOperator<Integer> {
+               private static final long serialVersionUID = 
-1153976702711944427L;
+
+               @Override
+               public ExecutionConfig getExecutionConfig() {
+                       return new ExecutionConfig();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
index 4b17300..4388c9d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.functions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
+import javax.annotation.Nullable;
+
 /**
  * The {@code AssignerWithPeriodicWatermarks} assigns event time timestamps to 
elements,
  * and generates low watermarks that signal event time progress within the 
stream.
@@ -71,5 +73,6 @@ public interface AssignerWithPeriodicWatermarks<T> extends 
TimestampAssigner<T>
         *
         * @return {@code Null}, if no watermark should be emitted, or the next 
watermark to emit.
         */
+       @Nullable
        Watermark getCurrentWatermark();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
index 48f29b2..5b5694c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.streaming.api.watermark.Watermark;
 
+import javax.annotation.Nullable;
+
 /**
  * The {@code AssignerWithPunctuatedWatermarks} assigns event time timestamps 
to elements,
  * and generates low watermarks that signal event time progress within the 
stream.
@@ -79,5 +81,6 @@ public interface AssignerWithPunctuatedWatermarks<T> extends 
TimestampAssigner<T
         *
         * @return {@code Null}, if no watermark should be emitted, or the next 
watermark to emit.
         */
+       @Nullable
        Watermark checkAndGetNextWatermark(T lastElement, long 
extractedTimestamp);
 }

Reply via email to