http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
new file mode 100644
index 0000000..dccf698
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+/**
+ * A class containing a special Kafka broker which has a log retention of only 
250 ms.
+ * This way, we can make sure our consumer is properly handling cases where we 
run into out of offset
+ * errors
+ */
+@SuppressWarnings("serial")
+public class KafkaShortRetentionTestBase implements Serializable {
+       
+       protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaShortRetentionTestBase.class);
+       
+       private static KafkaTestEnvironment kafkaServer;
+       private static Properties standardProps;
+       private static LocalFlinkMiniCluster flink;
+
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       protected static Properties secureProps = new Properties();
+
+       @BeforeClass
+       public static void prepare() throws IOException, ClassNotFoundException 
{
+               
LOG.info("-------------------------------------------------------------------------");
+               LOG.info("    Starting KafkaShortRetentionTestBase ");
+               
LOG.info("-------------------------------------------------------------------------");
+
+               Configuration flinkConfig = new Configuration();
+
+               // dynamically load the implementation for the test
+               Class<?> clazz = 
Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
+               kafkaServer = (KafkaTestEnvironment) 
InstantiationUtil.instantiate(clazz);
+
+               LOG.info("Starting KafkaTestBase.prepare() for Kafka " + 
kafkaServer.getVersion());
+
+               if(kafkaServer.isSecureRunSupported()) {
+                       secureProps = kafkaServer.getSecureProperties();
+               }
+
+               Properties specificProperties = new Properties();
+               specificProperties.setProperty("log.retention.hours", "0");
+               specificProperties.setProperty("log.retention.minutes", "0");
+               specificProperties.setProperty("log.retention.ms", "250");
+               
specificProperties.setProperty("log.retention.check.interval.ms", "100");
+               kafkaServer.prepare(1, specificProperties, false);
+
+               standardProps = kafkaServer.getStandardProperties();
+
+               // start also a re-usable Flink mini cluster
+               
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+               
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+               flink = new LocalFlinkMiniCluster(flinkConfig, false);
+               flink.start();
+       }
+
+       @AfterClass
+       public static void shutDownServices() {
+               if (flink != null) {
+                       flink.shutdown();
+               }
+               kafkaServer.shutdown();
+
+               secureProps.clear();
+       }
+
+       /**
+        * This test is concurrently reading and writing from a kafka topic.
+        * The job will run for a while
+        * In a special deserializationSchema, we make sure that the offsets 
from the topic
+        * are non-continuous (because the data is expiring faster than its 
consumed --> with auto.offset.reset = 'earliest', some offsets will not show up)
+        *
+        */
+       private static boolean stopProducer = false;
+
+       public void runAutoOffsetResetTest() throws Exception {
+               final String topic = "auto-offset-reset-test";
+
+               final int parallelism = 1;
+               final int elementsPerPartition = 50000;
+
+               Properties tprops = new Properties();
+               tprops.setProperty("retention.ms", "250");
+               kafkaServer.createTestTopic(topic, parallelism, 1, tprops);
+
+               final StreamExecutionEnvironment env =
+                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
flink.getLeaderRPCPort());
+               env.setParallelism(parallelism);
+               env.setRestartStrategy(RestartStrategies.noRestart()); // fail 
immediately
+               env.getConfig().disableSysoutLogging();
+
+
+               // ----------- add producer dataflow ----------
+
+
+               DataStream<String> stream = env.addSource(new 
RichParallelSourceFunction<String>() {
+
+                       private boolean running = true;
+
+                       @Override
+                       public void run(SourceContext<String> ctx) throws 
InterruptedException {
+                               int cnt = 
getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
+                               int limit = cnt + elementsPerPartition;
+
+
+                               while (running && !stopProducer && cnt < limit) 
{
+                                       ctx.collect("element-" + cnt);
+                                       cnt++;
+                                       Thread.sleep(10);
+                               }
+                               LOG.info("Stopping producer");
+                       }
+
+                       @Override
+                       public void cancel() {
+                               running = false;
+                       }
+               });
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+               kafkaServer.produceIntoKafka(stream, topic, new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null);
+
+               // ----------- add consumer dataflow ----------
+
+               NonContinousOffsetsDeserializationSchema deserSchema = new 
NonContinousOffsetsDeserializationSchema();
+               FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, deserSchema, props);
+
+               DataStreamSource<String> consuming = env.addSource(source);
+               consuming.addSink(new DiscardingSink<String>());
+
+               tryExecute(env, "run auto offset reset test");
+
+               kafkaServer.deleteTestTopic(topic);
+       }
+
+       
+       private class NonContinousOffsetsDeserializationSchema implements 
KeyedDeserializationSchema<String> {
+               private int numJumps;
+               long nextExpected = 0;
+
+               @Override
+               public String deserialize(byte[] messageKey, byte[] message, 
String topic, int partition, long offset) throws IOException {
+                       if(offset != nextExpected) {
+                               numJumps++;
+                               nextExpected = offset;
+                               LOG.info("Registered now jump at offset {}", 
offset);
+                       }
+                       nextExpected++;
+                       try {
+                               Thread.sleep(10); // slow down data consumption 
to trigger log eviction
+                       } catch (InterruptedException e) {
+                               throw new RuntimeException("Stopping it");
+                       }
+                       return "";
+               }
+
+               @Override
+               public boolean isEndOfStream(String nextElement) {
+                       if( numJumps >= 5) {
+                               // we saw 5 jumps and no failures --> consumer 
can handle auto.offset.reset
+                               stopProducer = true;
+                               return true;
+                       }
+                       return false;
+               }
+
+               @Override
+               public TypeInformation<String> getProducedType() {
+                       return TypeInfoParser.parse("String");
+               }
+       }
+
+
+       /**
+        * Ensure that the consumer is properly failing if "auto.offset.reset" 
is set to "none"
+        * @throws Exception
+        */
+       public void runFailOnAutoOffsetResetNone() throws Exception {
+               final String topic = "auto-offset-reset-none-test";
+               final int parallelism = 1;
+               
+               kafkaServer.createTestTopic(topic, parallelism, 1);
+
+               final StreamExecutionEnvironment env =
+                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
flink.getLeaderRPCPort());
+               env.setParallelism(parallelism);
+               env.setRestartStrategy(RestartStrategies.noRestart()); // fail 
immediately
+               env.getConfig().disableSysoutLogging();
+               
+               // ----------- add consumer ----------
+
+               Properties customProps = new Properties();
+               customProps.putAll(standardProps);
+               customProps.putAll(secureProps);
+               customProps.setProperty("auto.offset.reset", "none"); // test 
that "none" leads to an exception
+               FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
+
+               DataStreamSource<String> consuming = env.addSource(source);
+               consuming.addSink(new DiscardingSink<String>());
+
+               try {
+                       env.execute("Test auto offset reset none");
+               } catch(Throwable e) {
+                       System.out.println("MESSAGE: " + 
e.getCause().getCause().getMessage());
+                       // check if correct exception has been thrown
+                       
if(!e.getCause().getCause().getMessage().contains("Unable to find previous 
offset")  // kafka 0.8
+                        && 
!e.getCause().getCause().getMessage().contains("Undefined offset with no reset 
policy for partition") // kafka 0.9
+                                       ) {
+                               throw e;
+                       }
+               }
+
+               kafkaServer.deleteTestTopic(topic);
+       }
+
+       public void runFailOnAutoOffsetResetNoneEager() throws Exception {
+               final String topic = "auto-offset-reset-none-test";
+               final int parallelism = 1;
+
+               kafkaServer.createTestTopic(topic, parallelism, 1);
+
+               // ----------- add consumer ----------
+
+               Properties customProps = new Properties();
+               customProps.putAll(standardProps);
+               customProps.putAll(secureProps);
+               customProps.setProperty("auto.offset.reset", "none"); // test 
that "none" leads to an exception
+               
+               try {
+                       kafkaServer.getConsumer(topic, new 
SimpleStringSchema(), customProps);
+                       fail("should fail with an exception");
+               }
+               catch (IllegalArgumentException e) {
+                       // expected
+                       assertTrue(e.getMessage().contains("none"));
+               }
+
+               kafkaServer.deleteTestTopic(topic);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
new file mode 100644
index 0000000..ae0af52
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public abstract class KafkaTableSinkTestBase {
+
+       private static final String TOPIC = "testTopic";
+       protected static final String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
+       private static final TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+       private static final KafkaPartitioner<Row> PARTITIONER = new 
CustomPartitioner();
+       private static final Properties PROPERTIES = createSinkProperties();
+       @SuppressWarnings("unchecked")
+       private final FlinkKafkaProducerBase<Row> PRODUCER = new 
FlinkKafkaProducerBase<Row>(
+               TOPIC, new 
KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, 
PARTITIONER) {
+
+               @Override
+               protected void flush() {}
+       };
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testKafkaTableSink() throws Exception {
+               DataStream dataStream = mock(DataStream.class);
+
+               KafkaTableSink kafkaTableSink = spy(createTableSink());
+               kafkaTableSink.emitDataStream(dataStream);
+
+               verify(dataStream).addSink(eq(PRODUCER));
+
+               verify(kafkaTableSink).createKafkaProducer(
+                       eq(TOPIC),
+                       eq(PROPERTIES),
+                       any(getSerializationSchema().getClass()),
+                       eq(PARTITIONER));
+       }
+
+       @Test
+       public void testConfiguration() {
+               KafkaTableSink kafkaTableSink = createTableSink();
+               KafkaTableSink newKafkaTableSink = 
kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+               assertNotSame(kafkaTableSink, newKafkaTableSink);
+
+               assertArrayEquals(FIELD_NAMES, 
newKafkaTableSink.getFieldNames());
+               assertArrayEquals(FIELD_TYPES, 
newKafkaTableSink.getFieldTypes());
+               assertEquals(new RowTypeInfo(FIELD_TYPES), 
newKafkaTableSink.getOutputType());
+       }
+
+       protected abstract KafkaTableSink createTableSink(String topic, 
Properties properties,
+                       KafkaPartitioner<Row> partitioner, 
FlinkKafkaProducerBase<Row> kafkaProducer);
+
+       protected abstract SerializationSchema<Row> getSerializationSchema();
+
+       private KafkaTableSink createTableSink() {
+               return createTableSink(TOPIC, PROPERTIES, PARTITIONER, 
PRODUCER);
+       }
+
+       private static Properties createSinkProperties() {
+               Properties properties = new Properties();
+               properties.setProperty("bootstrap.servers", "localhost:12345");
+               return properties;
+       }
+
+       private static class CustomPartitioner extends KafkaPartitioner<Row> 
implements Serializable {
+               @Override
+               public int partition(Row next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions) {
+                       return 0;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
new file mode 100644
index 0000000..2a281e8
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public abstract class KafkaTableSourceTestBase {
+
+       private static final String TOPIC = "testTopic";
+       private static final String[] FIELD_NAMES = new String[] { "long", 
"string", "boolean", "double", "missing-field" };
+       private static final TypeInformation<?>[] FIELD_TYPES = new 
TypeInformation<?>[] {
+               BasicTypeInfo.LONG_TYPE_INFO,
+               BasicTypeInfo.STRING_TYPE_INFO,
+               BasicTypeInfo.BOOLEAN_TYPE_INFO,
+               BasicTypeInfo.DOUBLE_TYPE_INFO,
+               BasicTypeInfo.LONG_TYPE_INFO };
+       private static final Properties PROPERTIES = createSourceProperties();
+
+       @Test
+       public void testKafkaTableSource() {
+               KafkaTableSource kafkaTableSource = spy(createTableSource());
+               StreamExecutionEnvironment env = 
mock(StreamExecutionEnvironment.class);
+               kafkaTableSource.getDataStream(env);
+
+               verify(env).addSource(any(getFlinkKafkaConsumer()));
+
+               verify(kafkaTableSource).getKafkaConsumer(
+                       eq(TOPIC),
+                       eq(PROPERTIES),
+                       any(getDeserializationSchema()));
+       }
+
+       protected abstract KafkaTableSource createTableSource(String topic, 
Properties properties,
+                       String[] fieldNames, TypeInformation<?>[] typeInfo);
+
+       protected abstract Class<DeserializationSchema<Row>> 
getDeserializationSchema();
+
+       protected abstract Class<FlinkKafkaConsumerBase<Row>> 
getFlinkKafkaConsumer();
+
+       private KafkaTableSource createTableSource() {
+               return createTableSource(TOPIC, PROPERTIES, FIELD_NAMES, 
FIELD_TYPES);
+       }
+
+       private static Properties createSourceProperties() {
+               Properties properties = new Properties();
+               properties.setProperty("zookeeper.connect", "dummy");
+               properties.setProperty("group.id", "dummy");
+               return properties;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
new file mode 100644
index 0000000..5cec4f0
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.jmx.JMXReporter;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * The base for the Kafka tests. It brings up:
+ * <ul>
+ *     <li>A ZooKeeper mini cluster</li>
+ *     <li>Three Kafka Brokers (mini clusters)</li>
+ *     <li>A Flink mini cluster</li>
+ * </ul>
+ * 
+ * <p>Code in this test is based on the following GitHub repository:
+ * <a href="https://github.com/sakserv/hadoop-mini-clusters";>
+ *   https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
+ * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
+ */
+@SuppressWarnings("serial")
+public abstract class KafkaTestBase extends TestLogger {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestBase.class);
+       
+       protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
+
+       protected static String brokerConnectionStrings;
+
+       protected static Properties standardProps;
+       
+       protected static LocalFlinkMiniCluster flink;
+
+       protected static int flinkPort;
+
+       protected static FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
+
+       protected static KafkaTestEnvironment kafkaServer;
+
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       protected static Properties secureProps = new Properties();
+
+       // 
------------------------------------------------------------------------
+       //  Setup and teardown of the mini clusters
+       // 
------------------------------------------------------------------------
+       
+       @BeforeClass
+       public static void prepare() throws IOException, ClassNotFoundException 
{
+
+               
LOG.info("-------------------------------------------------------------------------");
+               LOG.info("    Starting KafkaTestBase ");
+               
LOG.info("-------------------------------------------------------------------------");
+
+               startClusters(false);
+
+       }
+
+       @AfterClass
+       public static void shutDownServices() {
+
+               
LOG.info("-------------------------------------------------------------------------");
+               LOG.info("    Shut down KafkaTestBase ");
+               
LOG.info("-------------------------------------------------------------------------");
+
+               shutdownClusters();
+
+               
LOG.info("-------------------------------------------------------------------------");
+               LOG.info("    KafkaTestBase finished");
+               
LOG.info("-------------------------------------------------------------------------");
+       }
+
+       protected static Configuration getFlinkConfiguration() {
+               Configuration flinkConfig = new Configuration();
+               
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+               
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+               flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"my_reporter");
+               flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
+               return flinkConfig;
+       }
+
+       protected static void startClusters(boolean secureMode) throws 
ClassNotFoundException {
+
+               // dynamically load the implementation for the test
+               Class<?> clazz = 
Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
+               kafkaServer = (KafkaTestEnvironment) 
InstantiationUtil.instantiate(clazz);
+
+               LOG.info("Starting KafkaTestBase.prepare() for Kafka " + 
kafkaServer.getVersion());
+
+               kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode);
+
+               standardProps = kafkaServer.getStandardProperties();
+
+               brokerConnectionStrings = 
kafkaServer.getBrokerConnectionString();
+
+               if (secureMode) {
+                       if (!kafkaServer.isSecureRunSupported()) {
+                               throw new IllegalStateException(
+                                       "Attempting to test in secure mode but 
secure mode not supported by the KafkaTestEnvironment.");
+                       }
+                       secureProps = kafkaServer.getSecureProperties();
+               }
+
+               // start also a re-usable Flink mini cluster
+               flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), 
false);
+               flink.start();
+
+               flinkPort = flink.getLeaderRPCPort();
+
+       }
+
+       protected static void shutdownClusters() {
+
+               flinkPort = -1;
+               if (flink != null) {
+                       flink.shutdown();
+               }
+
+               if(secureProps != null) {
+                       secureProps.clear();
+               }
+
+               kafkaServer.shutdown();
+
+       }
+
+
+
+       // 
------------------------------------------------------------------------
+       //  Execution utilities
+       // 
------------------------------------------------------------------------
+       
+
+       protected static void 
tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) 
throws Exception {
+               try {
+                       see.execute(name);
+               }
+               catch (ProgramInvocationException | JobExecutionException root) 
{
+                       Throwable cause = root.getCause();
+
+                       // search for nested SuccessExceptions
+                       int depth = 0;
+                       while (!(cause instanceof SuccessException)) {
+                               if (cause == null || depth++ == 20) {
+                                       throw root;
+                               }
+                               else {
+                                       cause = cause.getCause();
+                               }
+                       }
+               }
+       }
+
+       protected static void createTestTopic(String topic, int 
numberOfPartitions, int replicationFactor) {
+               kafkaServer.createTestTopic(topic, numberOfPartitions, 
replicationFactor);
+       }
+       
+       protected static void deleteTestTopic(String topic) {
+               kafkaServer.deleteTestTopic(topic);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
new file mode 100644
index 0000000..10c7b86
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.server.KafkaServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Properties;
+
+/**
+ * Abstract class providing a Kafka test environment
+ */
+public abstract class KafkaTestEnvironment {
+
+       protected static final String KAFKA_HOST = "localhost";
+
+       public abstract void prepare(int numKafkaServers, Properties 
kafkaServerProperties, boolean secureMode);
+
+       public void prepare(int numberOfKafkaServers, boolean secureMode) {
+               this.prepare(numberOfKafkaServers, null, secureMode);
+       }
+
+       public abstract void shutdown();
+
+       public abstract void deleteTestTopic(String topic);
+
+       public abstract void createTestTopic(String topic, int 
numberOfPartitions, int replicationFactor, Properties properties);
+
+       public void createTestTopic(String topic, int numberOfPartitions, int 
replicationFactor) {
+               this.createTestTopic(topic, numberOfPartitions, 
replicationFactor, new Properties());
+       }
+
+       public abstract Properties getStandardProperties();
+
+       public abstract Properties getSecureProperties();
+
+       public abstract String getBrokerConnectionString();
+
+       public abstract String getVersion();
+
+       public abstract List<KafkaServer> getBrokers();
+
+       // -- consumer / producer instances:
+       public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, 
DeserializationSchema<T> deserializationSchema, Properties props) {
+               return getConsumer(topics, new 
KeyedDeserializationSchemaWrapper<T>(deserializationSchema), props);
+       }
+
+       public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, 
KeyedDeserializationSchema<T> readSchema, Properties props) {
+               return getConsumer(Collections.singletonList(topic), 
readSchema, props);
+       }
+
+       public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, 
DeserializationSchema<T> deserializationSchema, Properties props) {
+               return getConsumer(Collections.singletonList(topic), 
deserializationSchema, props);
+       }
+
+       public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> 
topics, KeyedDeserializationSchema<T> readSchema, Properties props);
+
+       public abstract <T> StreamSink<T> getProducerSink(String topic,
+                       KeyedSerializationSchema<T> serSchema, Properties props,
+                       KafkaPartitioner<T> partitioner);
+
+       public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> 
stream, String topic,
+                                                                               
                                KeyedSerializationSchema<T> serSchema, 
Properties props,
+                                                                               
                                KafkaPartitioner<T> partitioner);
+
+       // -- offset handlers
+
+       public interface KafkaOffsetHandler {
+               Long getCommittedOffset(String topicName, int partition);
+               void close();
+       }
+
+       public abstract KafkaOffsetHandler createOffsetHandler(Properties 
props);
+
+       // -- leader failure simulation
+
+       public abstract void restartBroker(int leaderId) throws Exception;
+
+       public abstract int getLeaderToShutDown(String topic) throws Exception;
+
+       public abstract int getBrokerId(KafkaServer server);
+
+       public abstract boolean isSecureRunSupported();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
new file mode 100644
index 0000000..5dab05a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Assert;
+import org.junit.Test;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+
+public class TestFixedPartitioner {
+
+
+       /**
+        * <pre>
+        *              Flink Sinks:            Kafka Partitions
+        *                      1       ---------------->       1
+        *                      2   --------------/
+        *                      3   -------------/
+        *                      4       ------------/
+        * </pre>
+        */
+       @Test
+       public void testMoreFlinkThanBrokers() {
+               FixedPartitioner<String> part = new FixedPartitioner<>();
+
+               int[] partitions = new int[]{0};
+
+               part.open(0, 4, partitions);
+               Assert.assertEquals(0, part.partition("abc1", null, null, 
partitions.length));
+
+               part.open(1, 4, partitions);
+               Assert.assertEquals(0, part.partition("abc2", null, null, 
partitions.length));
+
+               part.open(2, 4, partitions);
+               Assert.assertEquals(0, part.partition("abc3", null, null, 
partitions.length));
+               Assert.assertEquals(0, part.partition("abc3", null, null, 
partitions.length)); // check if it is changing ;)
+
+               part.open(3, 4, partitions);
+               Assert.assertEquals(0, part.partition("abc4", null, null, 
partitions.length));
+       }
+
+       /**
+        *
+        * <pre>
+        *              Flink Sinks:            Kafka Partitions
+        *                      1       ---------------->       1
+        *                      2       ---------------->       2
+        *                                                                      
3
+        *                                                                      
4
+        *                                                                      
5
+        *
+        * </pre>
+        */
+       @Test
+       public void testFewerPartitions() {
+               FixedPartitioner<String> part = new FixedPartitioner<>();
+
+               int[] partitions = new int[]{0, 1, 2, 3, 4};
+               part.open(0, 2, partitions);
+               Assert.assertEquals(0, part.partition("abc1", null, null, 
partitions.length));
+               Assert.assertEquals(0, part.partition("abc1", null, null, 
partitions.length));
+
+               part.open(1, 2, partitions);
+               Assert.assertEquals(1, part.partition("abc1", null, null, 
partitions.length));
+               Assert.assertEquals(1, part.partition("abc1", null, null, 
partitions.length));
+       }
+
+       /*
+        *              Flink Sinks:            Kafka Partitions
+        *                      1       ------------>--->       1
+        *                      2       -----------/---->       2
+        *                      3       ----------/
+        */
+       @Test
+       public void testMixedCase() {
+               FixedPartitioner<String> part = new FixedPartitioner<>();
+               int[] partitions = new int[]{0,1};
+
+               part.open(0, 3, partitions);
+               Assert.assertEquals(0, part.partition("abc1", null, null, 
partitions.length));
+
+               part.open(1, 3, partitions);
+               Assert.assertEquals(1, part.partition("abc1", null, null, 
partitions.length));
+
+               part.open(2, 3, partitions);
+               Assert.assertEquals(0, part.partition("abc1", null, null, 
partitions.length));
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
new file mode 100644
index 0000000..0b3507a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class AbstractFetcherTimestampsTest {
+       
+       @Test
+       public void testPunctuatedWatermarks() throws Exception {
+               final String testTopic = "test topic name";
+               List<KafkaTopicPartition> originalPartitions = Arrays.asList(
+                               new KafkaTopicPartition(testTopic, 7),
+                               new KafkaTopicPartition(testTopic, 13),
+                               new KafkaTopicPartition(testTopic, 21));
+
+               TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
+
+               TestProcessingTimeService processingTimeProvider = new 
TestProcessingTimeService();
+
+               TestFetcher<Long> fetcher = new TestFetcher<>(
+                               sourceContext,
+                               originalPartitions,
+                               null, /* periodic watermark assigner */
+                               new 
SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new 
PunctuatedTestExtractor()),
+                               processingTimeProvider,
+                               0);
+
+               final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitions()[0];
+               final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitions()[1];
+               final KafkaTopicPartitionState<Object> part3 = 
fetcher.subscribedPartitions()[2];
+
+               // elements generate a watermark if the timestamp is a multiple 
of three
+               
+               // elements for partition 1
+               fetcher.emitRecord(1L, part1, 1L);
+               fetcher.emitRecord(2L, part1, 2L);
+               fetcher.emitRecord(3L, part1, 3L);
+               assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
+               assertFalse(sourceContext.hasWatermark());
+
+               // elements for partition 2
+               fetcher.emitRecord(12L, part2, 1L);
+               assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
+               assertFalse(sourceContext.hasWatermark());
+
+               // elements for partition 3
+               fetcher.emitRecord(101L, part3, 1L);
+               fetcher.emitRecord(102L, part3, 2L);
+               assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
+               
+               // now, we should have a watermark
+               assertTrue(sourceContext.hasWatermark());
+               assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
+               
+               // advance partition 3
+               fetcher.emitRecord(1003L, part3, 3L);
+               fetcher.emitRecord(1004L, part3, 4L);
+               fetcher.emitRecord(1005L, part3, 5L);
+               assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
+
+               // advance partition 1 beyond partition 2 - this bumps the 
watermark
+               fetcher.emitRecord(30L, part1, 4L);
+               assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());
+               assertTrue(sourceContext.hasWatermark());
+               assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
+
+               // advance partition 2 again - this bumps the watermark
+               fetcher.emitRecord(13L, part2, 2L);
+               assertFalse(sourceContext.hasWatermark());
+               fetcher.emitRecord(14L, part2, 3L);
+               assertFalse(sourceContext.hasWatermark());
+               fetcher.emitRecord(15L, part2, 3L);
+               assertTrue(sourceContext.hasWatermark());
+               assertEquals(15L, 
sourceContext.getLatestWatermark().getTimestamp());
+       }
+       
+       @Test
+       public void testPeriodicWatermarks() throws Exception {
+               final String testTopic = "test topic name";
+               List<KafkaTopicPartition> originalPartitions = Arrays.asList(
+                               new KafkaTopicPartition(testTopic, 7),
+                               new KafkaTopicPartition(testTopic, 13),
+                               new KafkaTopicPartition(testTopic, 21));
+
+               TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
+
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+
+               TestFetcher<Long> fetcher = new TestFetcher<>(
+                               sourceContext,
+                               originalPartitions,
+                               new 
SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new 
PeriodicTestExtractor()),
+                               null, /* punctuated watermarks assigner*/
+                               processingTimeService,
+                               10);
+
+               final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitions()[0];
+               final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitions()[1];
+               final KafkaTopicPartitionState<Object> part3 = 
fetcher.subscribedPartitions()[2];
+
+               // elements generate a watermark if the timestamp is a multiple 
of three
+
+               // elements for partition 1
+               fetcher.emitRecord(1L, part1, 1L);
+               fetcher.emitRecord(2L, part1, 2L);
+               fetcher.emitRecord(3L, part1, 3L);
+               assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
+
+               // elements for partition 2
+               fetcher.emitRecord(12L, part2, 1L);
+               assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
+
+               // elements for partition 3
+               fetcher.emitRecord(101L, part3, 1L);
+               fetcher.emitRecord(102L, part3, 2L);
+               assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
+
+               processingTimeService.setCurrentTime(10);
+
+               // now, we should have a watermark (this blocks until the 
periodic thread emitted the watermark)
+               assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
+
+               // advance partition 3
+               fetcher.emitRecord(1003L, part3, 3L);
+               fetcher.emitRecord(1004L, part3, 4L);
+               fetcher.emitRecord(1005L, part3, 5L);
+               assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
+
+               // advance partition 1 beyond partition 2 - this bumps the 
watermark
+               fetcher.emitRecord(30L, part1, 4L);
+               assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());
+
+               processingTimeService.setCurrentTime(20);
+
+               // this blocks until the periodic thread emitted the watermark
+               assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
+
+               // advance partition 2 again - this bumps the watermark
+               fetcher.emitRecord(13L, part2, 2L);
+               fetcher.emitRecord(14L, part2, 3L);
+               fetcher.emitRecord(15L, part2, 3L);
+
+               processingTimeService.setCurrentTime(30);
+               // this blocks until the periodic thread emitted the watermark
+               long watermarkTs = 
sourceContext.getLatestWatermark().getTimestamp();
+               assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Test mocks
+       // 
------------------------------------------------------------------------
+
+       private static final class TestFetcher<T> extends AbstractFetcher<T, 
Object> {
+
+               protected TestFetcher(
+                               SourceContext<T> sourceContext,
+                               List<KafkaTopicPartition> assignedPartitions,
+                               
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+                               
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+                               ProcessingTimeService processingTimeProvider,
+                               long autoWatermarkInterval) throws Exception
+               {
+                       super(sourceContext, assignedPartitions, 
watermarksPeriodic, watermarksPunctuated, processingTimeProvider, 
autoWatermarkInterval, TestFetcher.class.getClassLoader(), false);
+               }
+
+               @Override
+               public void runFetchLoop() throws Exception {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void cancel() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public Object createKafkaPartitionHandle(KafkaTopicPartition 
partition) {
+                       return new Object();
+               }
+
+               @Override
+               public void 
commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws 
Exception {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static final class TestSourceContext<T> implements 
SourceContext<T> {
+
+               private final Object checkpointLock = new Object();
+               private final Object watermarkLock = new Object();
+
+               private volatile StreamRecord<T> latestElement;
+               private volatile Watermark currentWatermark;
+
+               @Override
+               public void collect(T element) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void collectWithTimestamp(T element, long timestamp) {
+                       this.latestElement = new StreamRecord<>(element, 
timestamp);
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+                       synchronized (watermarkLock) {
+                               currentWatermark = mark;
+                               watermarkLock.notifyAll();
+                       }
+               }
+
+               @Override
+               public Object getCheckpointLock() {
+                       return checkpointLock;
+               }
+
+               @Override
+               public void close() {}
+
+               public StreamRecord<T> getLatestElement() {
+                       return latestElement;
+               }
+
+               public boolean hasWatermark() {
+                       return currentWatermark != null;
+               }
+               
+               public Watermark getLatestWatermark() throws 
InterruptedException {
+                       synchronized (watermarkLock) {
+                               while (currentWatermark == null) {
+                                       watermarkLock.wait();
+                               }
+                               Watermark wm = currentWatermark;
+                               currentWatermark = null;
+                               return wm;
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class PeriodicTestExtractor implements 
AssignerWithPeriodicWatermarks<Long> {
+
+               private volatile long maxTimestamp = Long.MIN_VALUE;
+               
+               @Override
+               public long extractTimestamp(Long element, long 
previousElementTimestamp) {
+                       maxTimestamp = Math.max(maxTimestamp, element);
+                       return element;
+               }
+
+               @Nullable
+               @Override
+               public Watermark getCurrentWatermark() {
+                       return new Watermark(maxTimestamp);
+               }
+       }
+
+       private static class PunctuatedTestExtractor implements 
AssignerWithPunctuatedWatermarks<Long> {
+
+               @Override
+               public long extractTimestamp(Long element, long 
previousElementTimestamp) {
+                       return element;
+               }
+
+               @Nullable
+               @Override
+               public Watermark checkAndGetNextWatermark(Long lastElement, 
long extractedTimestamp) {
+                       return extractedTimestamp % 3 == 0 ? new 
Watermark(extractedTimestamp) : null;
+               }
+               
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
new file mode 100644
index 0000000..0e16263
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import static org.junit.Assert.*;
+
+public class KafkaTopicPartitionTest {
+       
+       @Test
+       public void validateUid() {
+               Field uidField;
+               try {
+                       uidField = 
KafkaTopicPartition.class.getDeclaredField("serialVersionUID");
+                       uidField.setAccessible(true);
+               }
+               catch (NoSuchFieldException e) {
+                       fail("serialVersionUID is not defined");
+                       return;
+               }
+               
+               assertTrue(Modifier.isStatic(uidField.getModifiers()));
+               assertTrue(Modifier.isFinal(uidField.getModifiers()));
+               assertTrue(Modifier.isPrivate(uidField.getModifiers()));
+               
+               assertEquals(long.class, uidField.getType());
+               
+               // the UID has to be constant to make sure old 
checkpoints/savepoints can be read 
+               try {
+                       assertEquals(722083576322742325L, 
uidField.getLong(null));
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
new file mode 100644
index 0000000..9e8e1d9
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.mockito.Mockito.mock;
+
+@SuppressWarnings("serial")
+public class DataGenerators {
+
+       public static void 
generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
+                                                                               
                                 KafkaTestEnvironment testServer, String topic,
+                                                                               
                                 final int numPartitions,
+                                                                               
                                 final int numElements,
+                                                                               
                                 final boolean randomizeOrder) throws Exception 
{
+               env.setParallelism(numPartitions);
+               env.getConfig().disableSysoutLogging();
+               env.setRestartStrategy(RestartStrategies.noRestart());
+
+               DataStream<Integer> stream = env.addSource(
+                               new RichParallelSourceFunction<Integer>() {
+
+                                       private volatile boolean running = true;
+
+                                       @Override
+                                       public void run(SourceContext<Integer> 
ctx) {
+                                               // create a sequence
+                                               int[] elements = new 
int[numElements];
+                                               for (int i = 0, val = 
getRuntimeContext().getIndexOfThisSubtask();
+                                                        i < numElements;
+                                                        i++, val += 
getRuntimeContext().getNumberOfParallelSubtasks()) {
+
+                                                       elements[i] = val;
+                                               }
+
+                                               // scramble the sequence
+                                               if (randomizeOrder) {
+                                                       Random rnd = new 
Random();
+                                                       for (int i = 0; i < 
elements.length; i++) {
+                                                               int otherPos = 
rnd.nextInt(elements.length);
+
+                                                               int tmp = 
elements[i];
+                                                               elements[i] = 
elements[otherPos];
+                                                               
elements[otherPos] = tmp;
+                                                       }
+                                               }
+
+                                               // emit the sequence
+                                               int pos = 0;
+                                               while (running && pos < 
elements.length) {
+                                                       
ctx.collect(elements[pos++]);
+                                               }
+                                       }
+
+                                       @Override
+                                       public void cancel() {
+                                               running = false;
+                                       }
+                               });
+
+               Properties props = new Properties();
+               
props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()));
+               Properties secureProps = testServer.getSecureProperties();
+               if(secureProps != null) {
+                       props.putAll(testServer.getSecureProperties());
+               }
+
+               stream = stream.rebalance();
+               testServer.produceIntoKafka(stream, topic,
+                               new KeyedSerializationSchemaWrapper<>(new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, 
env.getConfig())),
+                               props,
+                               new KafkaPartitioner<Integer>() {
+                                       @Override
+                                       public int partition(Integer next, 
byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+                                               return next % numPartitions;
+                                       }
+                               });
+
+               env.execute("Scrambles int sequence generator");
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public static class InfiniteStringsGenerator extends Thread {
+
+               private final KafkaTestEnvironment server;
+
+               private final String topic;
+
+               private volatile Throwable error;
+
+               private volatile boolean running = true;
+
+
+               public InfiniteStringsGenerator(KafkaTestEnvironment server, 
String topic) {
+                       this.server = server;
+                       this.topic = topic;
+               }
+
+               @Override
+               public void run() {
+                       // we manually feed data into the Kafka sink
+                       RichFunction producer = null;
+                       try {
+                               Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString());
+                               producerProperties.setProperty("retries", "3");
+                               StreamTransformation<String> mockTransform = 
new MockStreamTransformation();
+                               DataStream<String> stream = new 
DataStream<>(new DummyStreamExecutionEnvironment(), mockTransform);
+
+                               StreamSink<String> sink = 
server.getProducerSink(
+                                               topic,
+                                               new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
+                                               producerProperties,
+                                               new FixedPartitioner<String>());
+
+                               OneInputStreamOperatorTestHarness<String, 
Object> testHarness =
+                                               new 
OneInputStreamOperatorTestHarness<>(sink);
+
+                               testHarness.open();
+
+                               final StringBuilder bld = new StringBuilder();
+                               final Random rnd = new Random();
+
+                               while (running) {
+                                       bld.setLength(0);
+
+                                       int len = rnd.nextInt(100) + 1;
+                                       for (int i = 0; i < len; i++) {
+                                               bld.append((char) 
(rnd.nextInt(20) + 'a') );
+                                       }
+
+                                       String next = bld.toString();
+                                       testHarness.processElement(new 
StreamRecord<>(next));
+                               }
+                       }
+                       catch (Throwable t) {
+                               this.error = t;
+                       }
+                       finally {
+                               if (producer != null) {
+                                       try {
+                                               producer.close();
+                                       }
+                                       catch (Throwable t) {
+                                               // ignore
+                                       }
+                               }
+                       }
+               }
+
+               public void shutdown() {
+                       this.running = false;
+                       this.interrupt();
+               }
+
+               public Throwable getError() {
+                       return this.error;
+               }
+
+               private static class MockStreamTransformation extends 
StreamTransformation<String> {
+                       public MockStreamTransformation() {
+                               super("MockTransform", 
TypeInfoParser.<String>parse("String"), 1);
+                       }
+
+                       @Override
+                       public void setChainingStrategy(ChainingStrategy 
strategy) {
+
+                       }
+
+                       @Override
+                       public Collection<StreamTransformation<?>> 
getTransitivePredecessors() {
+                               return null;
+                       }
+               }
+
+               public static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
+
+                       @Override
+                       public JobExecutionResult execute(String jobName) 
throws Exception {
+                               return null;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
new file mode 100644
index 0000000..2bd400c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
+               Checkpointed<Integer>, CheckpointListener, Runnable {
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(FailingIdentityMapper.class);
+       
+       private static final long serialVersionUID = 6334389850158707313L;
+       
+       public static volatile boolean failedBefore;
+       public static volatile boolean hasBeenCheckpointedBeforeFailure;
+
+       private final int failCount;
+       private int numElementsTotal;
+       private int numElementsThisTime;
+       
+       private boolean failer;
+       private boolean hasBeenCheckpointed;
+       
+       private Thread printer;
+       private volatile boolean printerRunning = true;
+
+       public FailingIdentityMapper(int failCount) {
+               this.failCount = failCount;
+       }
+
+       @Override
+       public void open(Configuration parameters) {
+               failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+               printer = new Thread(this, "FailingIdentityMapper Status 
Printer");
+               printer.start();
+       }
+
+       @Override
+       public T map(T value) throws Exception {
+               numElementsTotal++;
+               numElementsThisTime++;
+               
+               if (!failedBefore) {
+                       Thread.sleep(10);
+                       
+                       if (failer && numElementsTotal >= failCount) {
+                               hasBeenCheckpointedBeforeFailure = 
hasBeenCheckpointed;
+                               failedBefore = true;
+                               throw new Exception("Artificial Test Failure");
+                       }
+               }
+               return value;
+       }
+
+       @Override
+       public void close() throws Exception {
+               printerRunning = false;
+               if (printer != null) {
+                       printer.interrupt();
+                       printer = null;
+               }
+       }
+
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) {
+               this.hasBeenCheckpointed = true;
+       }
+
+       @Override
+       public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
+               return numElementsTotal;
+       }
+
+       @Override
+       public void restoreState(Integer state) {
+               numElementsTotal = state;
+       }
+
+       @Override
+       public void run() {
+               while (printerRunning) {
+                       try {
+                               Thread.sleep(5000);
+                       }
+                       catch (InterruptedException e) {
+                               // ignore
+                       }
+                       LOG.info("============================> Failing mapper  
{}: count={}, totalCount={}",
+                                       
getRuntimeContext().getIndexOfThisSubtask(),
+                                       numElementsThisTime, numElementsTotal);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
new file mode 100644
index 0000000..055326d
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.util.Properties;
+
+public class FakeStandardProducerConfig {
+
+       public static Properties get() {
+               Properties p = new Properties();
+               p.setProperty("bootstrap.servers", "localhost:12345");
+               p.setProperty("key.serializer", 
ByteArraySerializer.class.getName());
+               p.setProperty("value.serializer", 
ByteArraySerializer.class.getName());
+               return p;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
new file mode 100644
index 0000000..acdad5a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class JobManagerCommunicationUtils {
+
+       private static final FiniteDuration askTimeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
+
+
+       public static void waitUntilNoJobIsRunning(ActorGateway jobManager) 
throws Exception {
+               while (true) {
+                       // find the jobID
+                       Future<Object> listResponse = jobManager.ask(
+                                       
JobManagerMessages.getRequestRunningJobsStatus(), askTimeout);
+
+                       Object result = Await.result(listResponse, askTimeout);
+                       List<JobStatusMessage> jobs = 
((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+
+
+                       if (jobs.isEmpty()) {
+                               return;
+                       }
+
+                       Thread.sleep(50);
+               }
+       }
+
+       public static void cancelCurrentJob(ActorGateway jobManager) throws 
Exception {
+               cancelCurrentJob(jobManager, null);
+       }
+
+       public static void cancelCurrentJob(ActorGateway jobManager, String 
name) throws Exception {
+               JobStatusMessage status = null;
+               
+               for (int i = 0; i < 200; i++) {
+                       // find the jobID
+                       Future<Object> listResponse = jobManager.ask(
+                                       
JobManagerMessages.getRequestRunningJobsStatus(),
+                                       askTimeout);
+       
+                       List<JobStatusMessage> jobs;
+                       try {
+                               Object result = Await.result(listResponse, 
askTimeout);
+                               jobs = ((JobManagerMessages.RunningJobsStatus) 
result).getStatusMessages();
+                       }
+                       catch (Exception e) {
+                               throw new Exception("Could not cancel job - 
failed to retrieve running jobs from the JobManager.", e);
+                       }
+               
+                       if (jobs.isEmpty()) {
+                               // try again, fall through the loop
+                               Thread.sleep(50);
+                       }
+                       else if (jobs.size() == 1) {
+                               status = jobs.get(0);
+                       }
+                       else if(name != null) {
+                               for(JobStatusMessage msg: jobs) {
+                                       if(msg.getJobName().equals(name)) {
+                                               status = msg;
+                                       }
+                               }
+                               if(status == null) {
+                                       throw new Exception("Could not cancel 
job - no job matched expected name = '" + name +"' in " + jobs);
+                               }
+                       } else {
+                               String jobNames = "";
+                               for(JobStatusMessage jsm: jobs) {
+                                       jobNames += jsm.getJobName() + ", ";
+                               }
+                               throw new Exception("Could not cancel job - 
more than one running job: " + jobNames);
+                       }
+               }
+               
+               if (status == null) {
+                       throw new Exception("Could not cancel job - no running 
jobs");  
+               }
+               else if (status.getJobState().isGloballyTerminalState()) {
+                       throw new Exception("Could not cancel job - job is not 
running any more");
+               }
+               
+               JobID jobId = status.getJobId();
+               
+               Future<Object> response = jobManager.ask(new 
JobManagerMessages.CancelJob(jobId), askTimeout);
+               try {
+                       Await.result(response, askTimeout);
+               }
+               catch (Exception e) {
+                       throw new Exception("Sending the 'cancel' message 
failed.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
new file mode 100644
index 0000000..e105e01
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+public class PartitionValidatingMapper implements MapFunction<Integer, 
Integer> {
+
+       private static final long serialVersionUID = 1088381231244959088L;
+       
+       /* the partitions from which this function received data */
+       private final Set<Integer> myPartitions = new HashSet<>();
+       
+       private final int numPartitions;
+       private final int maxPartitions;
+
+       public PartitionValidatingMapper(int numPartitions, int maxPartitions) {
+               this.numPartitions = numPartitions;
+               this.maxPartitions = maxPartitions;
+       }
+
+       @Override
+       public Integer map(Integer value) throws Exception {
+               // validate that the partitioning is identical
+               int partition = value % numPartitions;
+               myPartitions.add(partition);
+               if (myPartitions.size() > maxPartitions) {
+                       throw new Exception("Error: Elements from too many 
different partitions: " + myPartitions
+                                       + ". Expect elements only from " + 
maxPartitions + " partitions");
+               }
+               return value;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
new file mode 100644
index 0000000..1d61229
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * An identity map function that sleeps between elements, throttling the
+ * processing speed.
+ * 
+ * @param <T> The type mapped.
+ */
+public class ThrottledMapper<T> implements MapFunction<T,T> {
+
+       private static final long serialVersionUID = 467008933767159126L;
+
+       private final int sleep;
+
+       public ThrottledMapper(int sleep) {
+               this.sleep = sleep;
+       }
+
+       @Override
+       public T map(T value) throws Exception {
+               Thread.sleep(this.sleep);
+               return value;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
new file mode 100644
index 0000000..c9e9ac1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+
+import java.io.Serializable;
+
+/**
+ * Special partitioner that uses the first field of a 2-tuple as the partition,
+ * and that expects a specific number of partitions.
+ */
+public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, 
Integer>> implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private final int expectedPartitions;
+
+       public Tuple2Partitioner(int expectedPartitions) {
+               this.expectedPartitions = expectedPartitions;
+       }
+
+       @Override
+       public int partition(Tuple2<Integer, Integer> next, byte[] 
serializedKey, byte[] serializedValue, int numPartitions) {
+               if (numPartitions != expectedPartitions) {
+                       throw new IllegalArgumentException("Expected " + 
expectedPartitions + " partitions");
+               }
+
+               return next.f0;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
new file mode 100644
index 0000000..7813561
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.test.util.SuccessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+
+public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> 
implements Checkpointed<Tuple2<Integer, BitSet>> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
+
+       private static final long serialVersionUID = 1748426382527469932L;
+       
+       private final int numElementsTotal;
+       
+       private BitSet duplicateChecker = new BitSet();  // this is checkpointed
+
+       private int numElements; // this is checkpointed
+
+       
+       public ValidatingExactlyOnceSink(int numElementsTotal) {
+               this.numElementsTotal = numElementsTotal;
+       }
+
+       
+       @Override
+       public void invoke(Integer value) throws Exception {
+               numElements++;
+               
+               if (duplicateChecker.get(value)) {
+                       throw new Exception("Received a duplicate: " + value);
+               }
+               duplicateChecker.set(value);
+               if (numElements == numElementsTotal) {
+                       // validate
+                       if (duplicateChecker.cardinality() != numElementsTotal) 
{
+                               throw new Exception("Duplicate checker has 
wrong cardinality");
+                       }
+                       else if (duplicateChecker.nextClearBit(0) != 
numElementsTotal) {
+                               throw new Exception("Received sparse sequence");
+                       }
+                       else {
+                               throw new SuccessException();
+                       }
+               }
+       }
+
+       @Override
+       public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long 
checkpointTimestamp) {
+               LOG.info("Snapshot of counter "+numElements+" at checkpoint 
"+checkpointId);
+               return new Tuple2<>(numElements, duplicateChecker);
+       }
+
+       @Override
+       public void restoreState(Tuple2<Integer, BitSet> state) {
+               LOG.info("restoring num elements to {}", state.f0);
+               this.numElements = state.f0;
+               this.duplicateChecker = state.f1;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
new file mode 100644
index 0000000..8a4c408
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import java.nio.charset.Charset;
+
+/**
+ * Simple ZooKeeper serializer for Strings.
+ */
+public class ZooKeeperStringSerializer implements ZkSerializer {
+
+       private static final Charset CHARSET = Charset.forName("UTF-8");
+       
+       @Override
+       public byte[] serialize(Object data) {
+               if (data instanceof String) {
+                       return ((String) data).getBytes(CHARSET);
+               }
+               else {
+                       throw new 
IllegalArgumentException("ZooKeeperStringSerializer can only serialize 
strings.");
+               }
+       }
+
+       @Override
+       public Object deserialize(byte[] bytes) {
+               if (bytes == null) {
+                       return null;
+               }
+               else {
+                       return new String(bytes, CHARSET);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
 
b/flink-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

Reply via email to