http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
new file mode 100644
index 0000000..e94adb5
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class JobManagerCommunicationUtils {
+       
+       private static final FiniteDuration askTimeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
+       
+       
+       public static void cancelCurrentJob(ActorGateway jobManager) throws 
Exception {
+               
+               // find the jobID
+               Future<Object> listResponse = jobManager.ask(
+                               
JobManagerMessages.getRequestRunningJobsStatus(),
+                               askTimeout);
+
+               List<JobStatusMessage> jobs;
+               try {
+                       Object result = Await.result(listResponse, askTimeout);
+                       jobs = ((JobManagerMessages.RunningJobsStatus) 
result).getStatusMessages();
+               }
+               catch (Exception e) {
+                       throw new Exception("Could not cancel job - failed to 
retrieve running jobs from the JobManager.", e);
+               }
+               
+               if (jobs.isEmpty()) {
+                       throw new Exception("Could not cancel job - no running 
jobs");
+               }
+               if (jobs.size() != 1) {
+                       throw new Exception("Could not cancel job - more than 
one running job.");
+               }
+               
+               JobStatusMessage status = jobs.get(0);
+               if (status.getJobState().isTerminalState()) {
+                       throw new Exception("Could not cancel job - job is not 
running any more");
+               }
+               
+               JobID jobId = status.getJobId();
+               
+               Future<Object> response = jobManager.ask(new 
JobManagerMessages.CancelJob(jobId), askTimeout);
+               try {
+                       Await.result(response, askTimeout);
+               }
+               catch (Exception e) {
+                       throw new Exception("Sending the 'cancel' message 
failed.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
new file mode 100644
index 0000000..50c57ab
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class MockRuntimeContext extends StreamingRuntimeContext {
+
+       private final int numberOfParallelSubtasks;
+       private final int indexOfThisSubtask;
+
+       public MockRuntimeContext(int numberOfParallelSubtasks, int 
indexOfThisSubtask) {
+               super(new MockStreamOperator(),
+                               new MockEnvironment("no", 4 * 
MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
+                               Collections.<String, Accumulator<?, 
?>>emptyMap());
+               this.numberOfParallelSubtasks = numberOfParallelSubtasks;
+               this.indexOfThisSubtask = indexOfThisSubtask;
+       }
+
+       private static class MockStreamOperator extends AbstractStreamOperator {
+               private static final long serialVersionUID = 
-1153976702711944427L;
+
+               @Override
+               public ExecutionConfig getExecutionConfig() {
+                       return new ExecutionConfig();
+               }
+       }
+
+       @Override
+       public boolean isCheckpointingEnabled() {
+               return true;
+       }
+
+       @Override
+       public String getTaskName() {
+               return null;
+       }
+
+       @Override
+       public int getNumberOfParallelSubtasks() {
+               return numberOfParallelSubtasks;
+       }
+
+       @Override
+       public int getIndexOfThisSubtask() {
+               return indexOfThisSubtask;
+       }
+
+       @Override
+       public int getAttemptNumber() {
+               return 0;
+       }
+
+       @Override
+       public ExecutionConfig getExecutionConfig() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public ClassLoader getUserCodeClassLoader() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public <V, A extends Serializable> void addAccumulator(String name, 
Accumulator<V, A> accumulator) {
+               // noop
+       }
+
+       @Override
+       public <V, A extends Serializable> Accumulator<V, A> 
getAccumulator(String name) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Map<String, Accumulator<?, ?>> getAllAccumulators() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public IntCounter getIntCounter(String name) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public LongCounter getLongCounter(String name) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public DoubleCounter getDoubleCounter(String name) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Histogram getHistogram(String name) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public <RT> List<RT> getBroadcastVariable(String name) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public <T, C> C getBroadcastVariableWithInitializer(String name, 
BroadcastVariableInitializer<T, C> initializer) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public DistributedCache getDistributedCache() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public <S> OperatorState<S> getKeyValueState(String name, Class<S> 
stateType, S defaultState) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public <S> OperatorState<S> getKeyValueState(String name, 
TypeInformation<S> stateType, S defaultState) {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
new file mode 100644
index 0000000..e105e01
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+public class PartitionValidatingMapper implements MapFunction<Integer, 
Integer> {
+
+       private static final long serialVersionUID = 1088381231244959088L;
+       
+       /* the partitions from which this function received data */
+       private final Set<Integer> myPartitions = new HashSet<>();
+       
+       private final int numPartitions;
+       private final int maxPartitions;
+
+       public PartitionValidatingMapper(int numPartitions, int maxPartitions) {
+               this.numPartitions = numPartitions;
+               this.maxPartitions = maxPartitions;
+       }
+
+       @Override
+       public Integer map(Integer value) throws Exception {
+               // validate that the partitioning is identical
+               int partition = value % numPartitions;
+               myPartitions.add(partition);
+               if (myPartitions.size() > maxPartitions) {
+                       throw new Exception("Error: Elements from too many 
different partitions: " + myPartitions
+                                       + ". Expect elements only from " + 
maxPartitions + " partitions");
+               }
+               return value;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
new file mode 100644
index 0000000..1d61229
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * An identity map function that sleeps between elements, throttling the
+ * processing speed.
+ * 
+ * @param <T> The type mapped.
+ */
+public class ThrottledMapper<T> implements MapFunction<T,T> {
+
+       private static final long serialVersionUID = 467008933767159126L;
+
+       private final int sleep;
+
+       public ThrottledMapper(int sleep) {
+               this.sleep = sleep;
+       }
+
+       @Override
+       public T map(T value) throws Exception {
+               Thread.sleep(this.sleep);
+               return value;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
new file mode 100644
index 0000000..0844412
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+
+import java.io.Serializable;
+
+/**
+ * Special partitioner that uses the first field of a 2-tuple as the partition,
+ * and that expects a specific number of partitions.
+ */
+public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, 
Integer>> implements Serializable {
+       
+       private static final long serialVersionUID = 1L;
+
+       private final int expectedPartitions;
+
+       
+       public Tuple2Partitioner(int expectedPartitions) {
+               this.expectedPartitions = expectedPartitions;
+       }
+
+       @Override
+       public int partition(Tuple2<Integer, Integer> next, byte[] 
serializedKey, byte[] serializedValue, int numPartitions) {
+               if (numPartitions != expectedPartitions) {
+                       throw new IllegalArgumentException("Expected " + 
expectedPartitions + " partitions");
+               }
+               @SuppressWarnings("unchecked")
+               Tuple2<Integer, Integer> element = next;
+
+               return element.f0;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
new file mode 100644
index 0000000..7813561
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.test.util.SuccessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+
+public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> 
implements Checkpointed<Tuple2<Integer, BitSet>> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
+
+       private static final long serialVersionUID = 1748426382527469932L;
+       
+       private final int numElementsTotal;
+       
+       private BitSet duplicateChecker = new BitSet();  // this is checkpointed
+
+       private int numElements; // this is checkpointed
+
+       
+       public ValidatingExactlyOnceSink(int numElementsTotal) {
+               this.numElementsTotal = numElementsTotal;
+       }
+
+       
+       @Override
+       public void invoke(Integer value) throws Exception {
+               numElements++;
+               
+               if (duplicateChecker.get(value)) {
+                       throw new Exception("Received a duplicate: " + value);
+               }
+               duplicateChecker.set(value);
+               if (numElements == numElementsTotal) {
+                       // validate
+                       if (duplicateChecker.cardinality() != numElementsTotal) 
{
+                               throw new Exception("Duplicate checker has 
wrong cardinality");
+                       }
+                       else if (duplicateChecker.nextClearBit(0) != 
numElementsTotal) {
+                               throw new Exception("Received sparse sequence");
+                       }
+                       else {
+                               throw new SuccessException();
+                       }
+               }
+       }
+
+       @Override
+       public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long 
checkpointTimestamp) {
+               LOG.info("Snapshot of counter "+numElements+" at checkpoint 
"+checkpointId);
+               return new Tuple2<>(numElements, duplicateChecker);
+       }
+
+       @Override
+       public void restoreState(Tuple2<Integer, BitSet> state) {
+               LOG.info("restoring num elements to {}", state.f0);
+               this.numElements = state.f0;
+               this.duplicateChecker = state.f1;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka/pom.xml
deleted file mode 100644
index 7bd9bcb..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/pom.xml
+++ /dev/null
@@ -1,141 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-streaming-connectors-parent</artifactId>
-               <version>1.0-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-connector-kafka</artifactId>
-       <name>flink-connector-kafka</name>
-
-       <packaging>jar</packaging>
-
-       <!-- Allow users to pass custom connector versions -->
-       <properties>
-               <kafka.version>0.8.2.0</kafka.version>
-       </properties>
-
-       <dependencies>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.kafka</groupId>
-                       <artifactId>kafka_${scala.binary.version}</artifactId>
-                       <version>${kafka.version}</version>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>com.sun.jmx</groupId>
-                                       <artifactId>jmxri</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>com.sun.jdmk</groupId>
-                                       <artifactId>jmxtools</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>log4j</groupId>
-                                       <artifactId>log4j</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.slf4j</groupId>
-                                       <artifactId>slf4j-simple</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>net.sf.jopt-simple</groupId>
-                                       <artifactId>jopt-simple</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.scala-lang</groupId>
-                                       <artifactId>scala-reflect</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.scala-lang</groupId>
-                                       <artifactId>scala-compiler</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>com.yammer.metrics</groupId>
-                                       
<artifactId>metrics-annotation</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.xerial.snappy</groupId>
-                                       <artifactId>snappy-java</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-               <!-- force using the latest zkclient -->
-
-
-               <dependency>
-                       <groupId>com.google.guava</groupId>
-                       <artifactId>guava</artifactId>
-                       <version>${guava.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.curator</groupId>
-                       <artifactId>curator-test</artifactId>
-                       <version>${curator.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-shaded-curator-recipes</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-       </dependencies>
-
-       
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-failsafe-plugin</artifactId>
-                               <configuration>
-                                       <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
-                                       <forkCount>1</forkCount>
-                               </configuration>
-                       </plugin>
-               </plugins>
-       </build>
-       
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
deleted file mode 100644
index 69ed9bf..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ /dev/null
@@ -1,815 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
-import 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.util.NetUtils;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.Node;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
- * Apache Kafka. The consumer can run in multiple parallel instances, each of 
which will pull
- * data from one or more Kafka partitions. 
- * 
- * <p>The Flink Kafka Consumer participates in checkpointing and guarantees 
that no data is lost
- * during a failure, and that the computation processes elements "exactly 
once". 
- * (Note: These guarantees naturally assume that Kafka itself does not loose 
any data.)</p>
- * 
- * <p>To support a variety of Kafka brokers, protocol versions, and offset 
committing approaches,
- * the Flink Kafka Consumer can be parametrized with a <i>fetcher</i> and an 
<i>offset handler</i>.</p>
- *
- * <h1>Fetcher</h1>
- * 
- * <p>The fetcher is responsible to pull data from Kafka. Because Kafka has 
undergone a change in
- * protocols and APIs, there are currently two fetchers available:</p>
- * 
- * <ul>
- *     <li>{@link FetcherType#NEW_HIGH_LEVEL}: A fetcher based on the new 
Kafka consumer API.
- *         This fetcher is generally more robust, but works only with later 
versions of
- *         Kafka (&gt; 0.8.2).</li>
- *         
- *     <li>{@link FetcherType#LEGACY_LOW_LEVEL}: A fetcher based on the old 
low-level consumer API.
- *         This fetcher is works also with older versions of Kafka (0.8.1). 
The fetcher interprets
- *         the old Kafka consumer properties, like:
- *         <ul>
- *             <li>socket.timeout.ms</li>
- *             <li>socket.receive.buffer.bytes</li>
- *             <li>fetch.message.max.bytes</li>
- *             <li>auto.offset.reset with the values "latest", "earliest" 
(unlike 0.8.2 behavior)</li>
- *             <li>fetch.wait.max.ms</li>
- *         </ul>
- *     </li>
- * </ul>
- * 
- * <h1>Offset handler</h1>
- * 
- * <p>Offsets whose records have been read and are checkpointed will be 
committed back to Kafka / ZooKeeper
- * by the offset handler. In addition, the offset handler finds the point 
where the source initially
- * starts reading from the stream, when the streaming job is started.</p>
- * 
- * <p>Currently, the source offers two different offset handlers exist:</p>
- * <ul>
- *     <li>{@link OffsetStore#KAFKA}: Use this offset handler when the Kafka 
brokers are managing the offsets,
- *         and hence offsets need to be committed the Kafka brokers, rather 
than to ZooKeeper.
- *         Note that this offset handler works only on new versions of Kafka 
(0.8.2.x +) and
- *         with the {@link FetcherType#NEW_HIGH_LEVEL} fetcher.</li>
- *         
- *     <li>{@link OffsetStore#FLINK_ZOOKEEPER}: Use this offset handler when 
the offsets are managed
- *         by ZooKeeper, as in older versions of Kafka (0.8.1.x)</li>
- * </ul>
- * 
- * <p>Please note that Flink snapshots the offsets internally as part of its 
distributed checkpoints. The offsets
- * committed to Kafka / ZooKeeper are only to bring the outside view of 
progress in sync with Flink's view
- * of the progress. That way, monitoring and other jobs can get a view of how 
far the Flink Kafka consumer
- * has consumed a topic.</p>
- * 
- * <p><b>NOTE:</b> The implementation currently accesses partition metadata 
when the consumer
- * is constructed. That means that the client that submits the program needs 
to be able to
- * reach the Kafka brokers or ZooKeeper.</p>
- */
-public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
-               implements CheckpointNotifier, 
CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, 
ResultTypeQueryable<T> {
-
-       /**
-        * The offset store defines how acknowledged offsets are committed back 
to Kafka. Different
-        * options include letting Flink periodically commit to ZooKeeper, or 
letting Kafka manage the
-        * offsets (new Kafka versions only).
-        */
-       public enum OffsetStore {
-
-               /**
-                * Let Flink manage the offsets. Flink will periodically commit 
them to Zookeeper (usually after
-                * successful checkpoints), in the same structure as Kafka 
0.8.2.x
-                * 
-                * <p>Use this mode when using the source with Kafka 0.8.1.x 
brokers.</p>
-                */
-               FLINK_ZOOKEEPER,
-
-               /**
-                * Use the mechanisms in Kafka to commit offsets. Depending on 
the Kafka configuration, different
-                * mechanism will be used (broker coordinator, zookeeper)
-                */ 
-               KAFKA
-       }
-
-       /**
-        * The fetcher type defines which code paths to use to pull data from 
teh Kafka broker.
-        */
-       public enum FetcherType {
-
-               /**
-                * The legacy fetcher uses Kafka's old low-level consumer API.
-                * 
-                * <p>Use this fetcher for Kafka 0.8.1 brokers.</p>
-                */
-               LEGACY_LOW_LEVEL,
-
-               /**
-                * This fetcher uses a backport of the new consumer API to pull 
data from the Kafka broker.
-                * It is the fetcher that will be maintained in the future, and 
it already 
-                * handles certain failure cases with less overhead than the 
legacy fetcher.
-                * 
-                * <p>This fetcher works only Kafka 0.8.2 and 0.8.3 (and future 
versions).</p>
-                */
-               NEW_HIGH_LEVEL
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       private static final long serialVersionUID = -6272159445203409112L;
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumer.class);
-
-       /** Magic number to define an unset offset. Negative offsets are not 
used by Kafka (invalid),
-        * and we pick a number that is probably (hopefully) not used by Kafka 
as a magic number for anything else. */
-       public static final long OFFSET_NOT_SET = -915623761776L;
-
-       /** The maximum number of pending non-committed checkpoints to track, 
to avoid memory leaks */
-       public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
-
-       /** Configuration key for the number of retries for getting the 
partition info */
-       public static final String GET_PARTITIONS_RETRIES_KEY = 
"flink.get-partitions.retry";
-
-       /** Default number of retries for getting the partition info. One retry 
means going through the full list of brokers */
-       public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
-
-       
-       
-       // ------  Configuration of the Consumer -------
-       
-       /** The offset store where this consumer commits safe offsets */
-       private final OffsetStore offsetStore;
-
-       /** The type of fetcher to be used to pull data from Kafka */
-       private final FetcherType fetcherType;
-
-       /** List of partitions (including topics and leaders) to consume  */
-       private final List<KafkaTopicPartitionLeader> partitionInfos;
-       
-       /** The properties to parametrize the Kafka consumer and ZooKeeper 
client */ 
-       private final Properties props;
-
-       /** The schema to convert between Kafka#s byte messages, and Flink's 
objects */
-       private final KeyedDeserializationSchema<T> deserializer;
-
-
-       // ------  Runtime State  -------
-
-       /** Data for pending but uncommitted checkpoints */
-       private final LinkedMap pendingCheckpoints = new LinkedMap();
-       
-       /** The fetcher used to pull data from the Kafka brokers */
-       private transient Fetcher fetcher;
-       
-       /** The committer that persists the committed offsets */
-       private transient OffsetHandler offsetHandler;
-       
-       /** The partitions actually handled by this consumer at runtime */
-       private transient List<KafkaTopicPartitionLeader> subscribedPartitions;
-
-       /** The offsets of the last returned elements */
-       private transient HashMap<KafkaTopicPartition, Long> lastOffsets;
-
-       /** The latest offsets that have been committed to Kafka or ZooKeeper. 
These are never
-        * newer then the last offsets (Flink's internal view is fresher) */
-       private transient HashMap<KafkaTopicPartition, Long> committedOffsets;
-       
-       /** The offsets to restore to, if the consumer restores state from a 
checkpoint */
-       private transient HashMap<KafkaTopicPartition, Long> restoreToOffset;
-       
-       private volatile boolean running = true;
-       
-       // 
------------------------------------------------------------------------
-
-
-       /**
-        * Creates a new Flink Kafka Consumer, using the given type of fetcher 
and offset handler.
-        *
-        * <p>To determine which kink of fetcher and offset handler to use, 
please refer to the docs
-        * at the beginning of this class.</p>
-        *
-        * @param topic
-        *           The Kafka topic to read from.
-        * @param deserializer
-        *           The deserializer to turn raw byte messages (without key) 
into Java/Scala objects.
-        * @param props
-        *           The properties that are used to configure both the fetcher 
and the offset handler.
-        * @param offsetStore
-        *           The type of offset store to use (Kafka / ZooKeeper)
-        * @param fetcherType
-        *           The type of fetcher to use (new high-level API, old 
low-level API).
-        */
-       public FlinkKafkaConsumer(List<String> topic, DeserializationSchema<T> 
deserializer, Properties props,
-                                                       OffsetStore 
offsetStore, FetcherType fetcherType) {
-               this(topic, new 
KeyedDeserializationSchemaWrapper<>(deserializer),
-                               props, offsetStore, fetcherType);
-       }
-
-       /**
-        * Creates a new Flink Kafka Consumer, using the given type of fetcher 
and offset handler.
-        * 
-        * <p>To determine which kink of fetcher and offset handler to use, 
please refer to the docs
-        * at the beginning of this class.</p>
-        * 
-        * @param topics
-        *           The Kafka topics to read from.
-        * @param deserializer
-        *           The deserializer to turn raw byte messages into Java/Scala 
objects.
-        * @param props
-        *           The properties that are used to configure both the fetcher 
and the offset handler.
-        * @param offsetStore
-        *           The type of offset store to use (Kafka / ZooKeeper)
-        * @param fetcherType
-        *           The type of fetcher to use (new high-level API, old 
low-level API).
-        */
-       public FlinkKafkaConsumer(List<String> topics, 
KeyedDeserializationSchema<T> deserializer, Properties props,
-                                                               OffsetStore 
offsetStore, FetcherType fetcherType) {
-               this.offsetStore = checkNotNull(offsetStore);
-               this.fetcherType = checkNotNull(fetcherType);
-
-               if (fetcherType == FetcherType.NEW_HIGH_LEVEL) {
-                       throw new UnsupportedOperationException("The fetcher 
for Kafka 0.8.3 / 0.9.0 is not yet " +
-                                       "supported in Flink");
-               }
-               if (offsetStore == OffsetStore.KAFKA && fetcherType == 
FetcherType.LEGACY_LOW_LEVEL) {
-                       throw new IllegalArgumentException(
-                                       "The Kafka offset handler cannot be 
used together with the old low-level fetcher.");
-               }
-               
-               checkNotNull(topics, "topics");
-               this.props = checkNotNull(props, "props");
-               this.deserializer = checkNotNull(deserializer, 
"valueDeserializer");
-
-               // validate the zookeeper properties
-               if (offsetStore == OffsetStore.FLINK_ZOOKEEPER) {
-                       validateZooKeeperConfig(props);
-               }
-               
-               // Connect to a broker to get the partitions for all topics
-               this.partitionInfos = getPartitionsForTopic(topics, props);
-
-               if (partitionInfos.size() == 0) {
-                       throw new RuntimeException("Unable to retrieve any 
partitions for the requested topics " + topics.toString() + "." +
-                                       "Please check previous log entries");
-               }
-
-               if (LOG.isInfoEnabled()) {
-                       Map<String, Integer> countPerTopic = new HashMap<>();
-                       for (KafkaTopicPartitionLeader partition : 
partitionInfos) {
-                               Integer count = 
countPerTopic.get(partition.getTopicPartition().getTopic());
-                               if (count == null) {
-                                       count = 1;
-                               } else {
-                                       count++;
-                               }
-                               
countPerTopic.put(partition.getTopicPartition().getTopic(), count);
-                       }
-                       StringBuilder sb = new StringBuilder();
-                       for (Map.Entry<String, Integer> e : 
countPerTopic.entrySet()) {
-                               sb.append(e.getKey()).append(" 
(").append(e.getValue()).append("), ");
-                       }
-                       LOG.info("Consumer is going to read the following 
topics (with number of partitions): ", sb.toString());
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Source life cycle
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-               
-               final int numConsumers = 
getRuntimeContext().getNumberOfParallelSubtasks();
-               final int thisConsumerIndex = 
getRuntimeContext().getIndexOfThisSubtask();
-               
-               // pick which partitions we work on
-               subscribedPartitions = assignPartitions(this.partitionInfos, 
numConsumers, thisConsumerIndex);
-               
-               if (LOG.isInfoEnabled()) {
-                       LOG.info("Kafka consumer {} will read partitions {} out 
of partitions {}",
-                                       thisConsumerIndex, 
KafkaTopicPartitionLeader.toString(subscribedPartitions), 
this.partitionInfos.size());
-               }
-
-               // we leave the fetcher as null, if we have no partitions
-               if (subscribedPartitions.isEmpty()) {
-                       LOG.info("Kafka consumer {} has no partitions (empty 
source)", thisConsumerIndex);
-                       return;
-               }
-               
-               // create fetcher
-               switch (fetcherType){
-                       case NEW_HIGH_LEVEL:
-                               throw new 
UnsupportedOperationException("Currently unsupported");
-                       case LEGACY_LOW_LEVEL:
-                               fetcher = new 
LegacyFetcher(this.subscribedPartitions, props, 
getRuntimeContext().getTaskName());
-                               break;
-                       default:
-                               throw new RuntimeException("Requested unknown 
fetcher " + fetcher);
-               }
-
-               // offset handling
-               switch (offsetStore){
-                       case FLINK_ZOOKEEPER:
-                               offsetHandler = new 
ZookeeperOffsetHandler(props);
-                               break;
-                       case KAFKA:
-                               throw new Exception("Kafka offset handler 
cannot work with legacy fetcher");
-                       default:
-                               throw new RuntimeException("Requested unknown 
offset store " + offsetStore);
-               }
-               
-               committedOffsets = new HashMap<>();
-
-               // seek to last known pos, from restore request
-               if (restoreToOffset != null) {
-                       if (LOG.isInfoEnabled()) {
-                               LOG.info("Consumer {} is restored from previous 
checkpoint: {}",
-                                               thisConsumerIndex, 
KafkaTopicPartition.toString(restoreToOffset));
-                       }
-                       
-                       for (Map.Entry<KafkaTopicPartition, Long> 
restorePartition: restoreToOffset.entrySet()) {
-                               // seek fetcher to restore position
-                               // we set the offset +1 here, because seek() is 
accepting the next offset to read,
-                               // but the restore offset is the last read 
offset
-                               fetcher.seek(restorePartition.getKey(), 
restorePartition.getValue() + 1);
-                       }
-                       // initialize offsets with restored state
-                       this.lastOffsets = restoreToOffset;
-                       restoreToOffset = null;
-               }
-               else {
-                       // start with empty offsets
-                       lastOffsets = new HashMap<>();
-
-                       // no restore request. Let the offset handler take care 
of the initial offset seeking
-                       
offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
-               }
-       }
-
-       @Override
-       public void run(SourceContext<T> sourceContext) throws Exception {
-               if (fetcher != null) {
-                       // For non-checkpointed sources, a thread which 
periodically commits the current offset into ZK.
-                       PeriodicOffsetCommitter<T> offsetCommitter = null;
-
-                       // check whether we need to start the periodic 
checkpoint committer
-                       StreamingRuntimeContext streamingRuntimeContext = 
(StreamingRuntimeContext) getRuntimeContext();
-                       if (!streamingRuntimeContext.isCheckpointingEnabled()) {
-                               // we use Kafka's own configuration parameter 
key for this.
-                               // Note that the default configuration value in 
Kafka is 60 * 1000, so we use the
-                               // same here.
-                               long commitInterval = 
Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
-                               offsetCommitter = new 
PeriodicOffsetCommitter<>(commitInterval, this);
-                               offsetCommitter.setDaemon(true);
-                               offsetCommitter.start();
-                               LOG.info("Starting periodic offset committer, 
with commit interval of {}ms", commitInterval);
-                       }
-
-                       try {
-                               fetcher.run(sourceContext, deserializer, 
lastOffsets);
-                       } finally {
-                               if (offsetCommitter != null) {
-                                       offsetCommitter.close();
-                                       try {
-                                               offsetCommitter.join();
-                                       } catch(InterruptedException ie) {
-                                               // ignore interrupt
-                                       }
-                               }
-                       }
-               }
-               else {
-                       // this source never completes, so emit a 
Long.MAX_VALUE watermark
-                       // to not block watermark forwarding
-                       if 
(getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) {
-                               sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-                       }
-
-                       final Object waitLock = new Object();
-                       while (running) {
-                               // wait until we are canceled
-                               try {
-                                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
-                                       synchronized (waitLock) {
-                                               waitLock.wait();
-                                       }
-                               }
-                               catch (InterruptedException e) {
-                                       // do nothing, check our "running" 
status
-                               }
-                       }
-               }
-               
-               // close the context after the work was done. this can actually 
only
-               // happen when the fetcher decides to stop fetching
-               sourceContext.close();
-       }
-
-       @Override
-       public void cancel() {
-               // set ourselves as not running
-               running = false;
-               
-               // close the fetcher to interrupt any work
-               Fetcher fetcher = this.fetcher;
-               this.fetcher = null;
-               if (fetcher != null) {
-                       try {
-                               fetcher.close();
-                       }
-                       catch (IOException e) {
-                               LOG.warn("Error while closing Kafka connector 
data fetcher", e);
-                       }
-               }
-               
-               OffsetHandler offsetHandler = this.offsetHandler;
-               this.offsetHandler = null;
-               if (offsetHandler != null) {
-                       try {
-                               offsetHandler.close();
-                       }
-                       catch (IOException e) {
-                               LOG.warn("Error while closing Kafka connector 
offset handler", e);
-                       }
-               }
-       }
-
-       @Override
-       public void close() throws Exception {
-               cancel();
-               super.close();
-       }
-
-       @Override
-       public TypeInformation<T> getProducedType() {
-               return deserializer.getProducedType();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Checkpoint and restore
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public HashMap<KafkaTopicPartition, Long> snapshotState(long 
checkpointId, long checkpointTimestamp) throws Exception {
-               if (lastOffsets == null) {
-                       LOG.debug("snapshotState() requested on not yet opened 
source; returning null.");
-                       return null;
-               }
-               if (!running) {
-                       LOG.debug("snapshotState() called on closed source");
-                       return null;
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Snapshotting state. Offsets: {}, checkpoint 
id: {}, timestamp: {}",
-                                       
KafkaTopicPartition.toString(lastOffsets), checkpointId, checkpointTimestamp);
-               }
-
-               // the use of clone() is okay here is okay, we just need a new 
map, the keys are not changed
-               @SuppressWarnings("unchecked")
-               HashMap<KafkaTopicPartition, Long> currentOffsets = 
(HashMap<KafkaTopicPartition, Long>) lastOffsets.clone();
-
-               // the map cannot be asynchronously updated, because only one 
checkpoint call can happen
-               // on this function at a time: either snapshotState() or 
notifyCheckpointComplete()
-               pendingCheckpoints.put(checkpointId, currentOffsets);
-                       
-               while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) 
{
-                       pendingCheckpoints.remove(0);
-               }
-
-               return currentOffsets;
-       }
-
-       @Override
-       public void restoreState(HashMap<KafkaTopicPartition, Long> 
restoredOffsets) {
-               restoreToOffset = restoredOffsets;
-       }
-
-       @Override
-       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-               if (fetcher == null) {
-                       LOG.debug("notifyCheckpointComplete() called on 
uninitialized source");
-                       return;
-               }
-               if (!running) {
-                       LOG.debug("notifyCheckpointComplete() called on closed 
source");
-                       return;
-               }
-               
-               // only one commit operation must be in progress
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Committing offsets externally for checkpoint 
{}", checkpointId);
-               }
-
-               try {
-                       HashMap<KafkaTopicPartition, Long> checkpointOffsets;
-       
-                       // the map may be asynchronously updates when 
snapshotting state, so we synchronize
-                       synchronized (pendingCheckpoints) {
-                               final int posInMap = 
pendingCheckpoints.indexOf(checkpointId);
-                               if (posInMap == -1) {
-                                       LOG.warn("Received confirmation for 
unknown checkpoint id {}", checkpointId);
-                                       return;
-                               }
-
-                               //noinspection unchecked
-                               checkpointOffsets = 
(HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
-                               
-                               // remove older checkpoints in map
-                               for (int i = 0; i < posInMap; i++) {
-                                       pendingCheckpoints.remove(0);
-                               }
-                       }
-                       if (checkpointOffsets == null || 
checkpointOffsets.size() == 0) {
-                               LOG.info("Checkpoint state was empty.");
-                               return;
-                       }
-                       commitOffsets(checkpointOffsets, this);
-               }
-               catch (Exception e) {
-                       if (running) {
-                               throw e;
-                       }
-                       // else ignore exception if we are no longer running
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Miscellaneous utilities 
-       // 
------------------------------------------------------------------------
-
-       protected static List<KafkaTopicPartitionLeader> 
assignPartitions(List<KafkaTopicPartitionLeader> partitions, int numConsumers, 
int consumerIndex) {
-               checkArgument(numConsumers > 0);
-               checkArgument(consumerIndex < numConsumers);
-               
-               List<KafkaTopicPartitionLeader> partitionsToSub = new 
ArrayList<>();
-
-               for (int i = 0; i < partitions.size(); i++) {
-                       if (i % numConsumers == consumerIndex) {
-                               partitionsToSub.add(partitions.get(i));
-                       }
-               }
-               return partitionsToSub;
-       }
-
-       /**
-        * Thread to periodically commit the current read offset into Zookeeper.
-        */
-       private static class PeriodicOffsetCommitter<T> extends Thread {
-               
-               private final long commitInterval;
-               private final FlinkKafkaConsumer<T> consumer;
-               
-               private volatile boolean running = true;
-
-               public PeriodicOffsetCommitter(long commitInterval, 
FlinkKafkaConsumer<T> consumer) {
-                       this.commitInterval = commitInterval;
-                       this.consumer = consumer;
-               }
-
-               @Override
-               public void run() {
-                       try {
-                               while (running) {
-                                       try {
-                                               Thread.sleep(commitInterval);
-                                               //  ------------  commit 
current offsets ----------------
-
-                                               // create copy of current 
offsets
-                                               //noinspection unchecked
-                                               HashMap<KafkaTopicPartition, 
Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) 
consumer.lastOffsets.clone();
-                                               commitOffsets(currentOffsets, 
this.consumer);
-                                       } catch (InterruptedException e) {
-                                               if (running) {
-                                                       // throw unexpected 
interruption
-                                                       throw e;
-                                               }
-                                       }
-                               }
-                       } catch (Throwable t) {
-                               LOG.warn("Periodic checkpoint committer is 
stopping the fetcher because of an error", t);
-                               consumer.fetcher.stopWithError(t);
-                       }
-               }
-
-               public void close() {
-                       this.running = false;
-                       this.interrupt();
-               }
-
-       }
-
-       /**
-        * Utility method to commit offsets.
-        *
-        * @param toCommit the offsets to commit
-        * @param consumer consumer reference
-        * @param <T> message type
-        * @throws Exception
-        */
-       private static <T> void commitOffsets(HashMap<KafkaTopicPartition, 
Long> toCommit, FlinkKafkaConsumer<T> consumer) throws Exception {
-               Map<KafkaTopicPartition, Long> offsetsToCommit = new 
HashMap<>();
-               for (KafkaTopicPartitionLeader tp : 
consumer.subscribedPartitions) {
-                       Long offset = toCommit.get(tp.getTopicPartition());
-                       if(offset == null) {
-                               // There was no data ever consumed from this 
topic, that's why there is no entry
-                               // for this topicPartition in the map.
-                               continue;
-                       }
-                       Long lastCommitted = 
consumer.committedOffsets.get(tp.getTopicPartition());
-                       if (lastCommitted == null) {
-                               lastCommitted = OFFSET_NOT_SET;
-                       }
-                       if (offset != OFFSET_NOT_SET) {
-                               if (offset > lastCommitted) {
-                                       
offsetsToCommit.put(tp.getTopicPartition(), offset);
-                                       
consumer.committedOffsets.put(tp.getTopicPartition(), offset);
-                                       LOG.debug("Committing offset {} for 
partition {}", offset, tp.getTopicPartition());
-                               } else {
-                                       LOG.debug("Ignoring offset {} for 
partition {} because it is already committed", offset, tp.getTopicPartition());
-                               }
-                       }
-               }
-
-               if (LOG.isDebugEnabled() && offsetsToCommit.size() > 0) {
-                       LOG.debug("Committing offsets {} to offset store: {}", 
KafkaTopicPartition.toString(offsetsToCommit), consumer.offsetStore);
-               }
-
-               consumer.offsetHandler.commit(offsetsToCommit);
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Kafka / ZooKeeper communication utilities
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Send request to Kafka to get partitions for topic.
-        * 
-        * @param topics The name of the topics.
-        * @param properties The properties for the Kafka Consumer that is used 
to query the partitions for the topic. 
-        */
-       public static List<KafkaTopicPartitionLeader> 
getPartitionsForTopic(final List<String> topics, final Properties properties) {
-               String seedBrokersConfString = 
properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
-               final int numRetries = 
Integer.valueOf(properties.getProperty(GET_PARTITIONS_RETRIES_KEY, 
Integer.toString(DEFAULT_GET_PARTITIONS_RETRIES)));
-
-               checkNotNull(seedBrokersConfString, "Configuration property " + 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " not set");
-               String[] seedBrokers = seedBrokersConfString.split(",");
-               List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
-
-               Random rnd = new Random();
-               retryLoop: for (int retry = 0; retry < numRetries; retry++) {
-                       // we pick a seed broker randomly to avoid overloading 
the first broker with all the requests when the
-                       // parallel source instances start. Still, we try all 
available brokers.
-                       int index = rnd.nextInt(seedBrokers.length);
-                       brokersLoop: for (int arrIdx = 0; arrIdx < 
seedBrokers.length; arrIdx++) {
-                               String seedBroker = seedBrokers[index];
-                               LOG.info("Trying to get topic metadata from 
broker {} in try {}/{}", seedBroker, retry, numRetries);
-                               if (++index == seedBrokers.length) {
-                                       index = 0;
-                               }
-
-                               URL brokerUrl = 
NetUtils.getCorrectHostnamePort(seedBroker);
-                               SimpleConsumer consumer = null;
-                               try {
-                                       final String clientId = 
"flink-kafka-consumer-partition-lookup";
-                                       final int soTimeout = 
Integer.valueOf(properties.getProperty("socket.timeout.ms", "30000"));
-                                       final int bufferSize = 
Integer.valueOf(properties.getProperty("socket.receive.buffer.bytes", "65536"));
-                                       consumer = new 
SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, 
clientId);
-
-                                       TopicMetadataRequest req = new 
TopicMetadataRequest(topics);
-                                       kafka.javaapi.TopicMetadataResponse 
resp = consumer.send(req);
-
-                                       List<TopicMetadata> metaData = 
resp.topicsMetadata();
-
-                                       // clear in case we have an incomplete 
list from previous tries
-                                       partitions.clear();
-                                       for (TopicMetadata item : metaData) {
-                                               if (item.errorCode() != 
ErrorMapping.NoError()) {
-                                                       if (item.errorCode() == 
ErrorMapping.InvalidTopicCode() || item.errorCode() == 
ErrorMapping.UnknownTopicOrPartitionCode()) {
-                                                               // fail hard if 
topic is unknown
-                                                               throw new 
RuntimeException("Requested partitions for unknown topic", 
ErrorMapping.exceptionFor(item.errorCode()));
-                                                       }
-                                                       // warn and try more 
brokers
-                                                       LOG.warn("Error while 
getting metadata from broker " + seedBroker + " to find partitions " +
-                                                                       "for " 
+ topics.toString() + ". Error: " + 
ErrorMapping.exceptionFor(item.errorCode()).getMessage());
-                                                       continue brokersLoop;
-                                               }
-                                               if 
(!topics.contains(item.topic())) {
-                                                       LOG.warn("Received 
metadata from topic " + item.topic() + " even though it was not requested. 
Skipping ...");
-                                                       continue brokersLoop;
-                                               }
-                                               for (PartitionMetadata part : 
item.partitionsMetadata()) {
-                                                       Node leader = 
brokerToNode(part.leader());
-                                                       KafkaTopicPartition ktp 
= new KafkaTopicPartition(item.topic(), part.partitionId());
-                                                       
KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
-                                                       partitions.add(pInfo);
-                                               }
-                                       }
-                                       break retryLoop; // leave the loop 
through the brokers
-                               } catch (Exception e) {
-                                       LOG.warn("Error communicating with 
broker " + seedBroker + " to find partitions for " + topics.toString(), e);
-                               } finally {
-                                       if (consumer != null) {
-                                               consumer.close();
-                                       }
-                               }
-                       } // brokers loop
-               } // retries loop
-               return partitions;
-       }
-
-       /**
-        * Turn a broker instance into a node instance
-        * @param broker broker instance
-        * @return Node representing the given broker
-        */
-       private static Node brokerToNode(Broker broker) {
-               return new Node(broker.id(), broker.host(), broker.port());
-       }
-
-       /**
-        * Validate the ZK configuration, checking for required parameters
-        * @param props Properties to check
-        */
-       protected static void validateZooKeeperConfig(Properties props) {
-               if (props.getProperty("zookeeper.connect") == null) {
-                       throw new IllegalArgumentException("Required property 
'zookeeper.connect' has not been set in the properties");
-               }
-               if (props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
-                       throw new IllegalArgumentException("Required property 
'" + ConsumerConfig.GROUP_ID_CONFIG
-                                       + "' has not been set in the 
properties");
-               }
-               
-               try {
-                       //noinspection ResultOfMethodCallIgnored
-                       
Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
-               }
-               catch (NumberFormatException e) {
-                       throw new IllegalArgumentException("Property 
'zookeeper.session.timeout.ms' is not a valid integer");
-               }
-               
-               try {
-                       //noinspection ResultOfMethodCallIgnored
-                       
Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
-               }
-               catch (NumberFormatException e) {
-                       throw new IllegalArgumentException("Property 
'zookeeper.connection.timeout.ms' is not a valid integer");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
deleted file mode 100644
index abe33aa..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Collections;
-import java.util.Properties;
-
-/**
- * Creates a Kafka consumer compatible with reading from Kafka 0.8.1.x brokers.
- * The consumer will internally use the old low-level Kafka API, and manually 
commit offsets
- * partition offsets to ZooKeeper.
- * 
- * <p>The following additional configuration values are available:</p>
- * <ul>
- *   <li>socket.timeout.ms</li>
- *   <li>socket.receive.buffer.bytes</li>
- *   <li>fetch.message.max.bytes</li>
- *   <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 
behavior)</li>
- *   <li>fetch.wait.max.ms</li>
- * </ul>
- * 
- * @param <T> The type of elements produced by this consumer.
- */
-public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer<T> {
-
-       private static final long serialVersionUID = -5649906773771949146L;
-
-       /**
-        * Creates a new Kafka 0.8.1.x streaming source consumer.
-        *
-        * @param topic
-        *           The name of the topic that should be consumed.
-        * @param valueDeserializer
-        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects. 
-        * @param props
-        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
-        */
-       public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
-               super(Collections.singletonList(topic), valueDeserializer, 
props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
deleted file mode 100644
index adc42de..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Creates a Kafka consumer compatible with reading from Kafka 0.8.2.x brokers.
- * The consumer will internally use the old low-level Kafka API, and manually 
commit offsets
- * partition offsets to ZooKeeper.
- *
- * Once Kafka released the new consumer with Kafka 0.8.3 Flink might use the 
0.8.3 consumer API
- * also against Kafka 0.8.2 installations.
- *
- * @param <T> The type of elements produced by this consumer.
- */
-public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
-
-       private static final long serialVersionUID = -8450689820627198228L;
-
-       /**
-        * Creates a new Kafka 0.8.2.x streaming source consumer.
-        * 
-        * @param topic
-        *           The name of the topic that should be consumed.
-        * @param valueDeserializer
-        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects. 
-        * @param props
-        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
-        */
-       public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
-               super(Collections.singletonList(topic), valueDeserializer, 
props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
-       }
-
-
-       //----- key-value deserializer constructor
-
-       /**
-        * Creates a new Kafka 0.8.2.x streaming source consumer.
-        *
-        * This constructor allows passing a {@see KeyedDeserializationSchema} 
for reading key/value
-        * pairs from Kafka.
-        *
-        * @param topic
-        *           The name of the topic that should be consumed.
-        * @param deserializer
-        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
-        * @param props
-        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
-        */
-       public FlinkKafkaConsumer082(String topic, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
-               super(Collections.singletonList(topic), deserializer, props, 
OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
-       }
-
-       //----- topic list constructors
-
-
-       public FlinkKafkaConsumer082(List<String> topics, 
DeserializationSchema<T> valueDeserializer, Properties props) {
-               super(topics, valueDeserializer, props, 
OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
-       }
-
-       public FlinkKafkaConsumer082(List<String> topics, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
-               super(topics, deserializer, props, OffsetStore.FLINK_ZOOKEEPER, 
FetcherType.LEGACY_LOW_LEVEL);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
deleted file mode 100644
index 7e01b54..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.NetUtils;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Properties;
-
-
-/**
- * Flink Sink to produce data into a Kafka topic.
- *
- * Please note that this producer does not have any reliability guarantees.
- *
- * @param <IN> Type of the messages to write into Kafka.
- */
-public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducer.class);
-
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * Array with the partition ids of the given topicId
-        * The size of this array is the number of partitions
-        */
-       private final int[] partitions;
-
-       /**
-        * User defined properties for the Producer
-        */
-       private final Properties producerConfig;
-
-       /**
-        * The name of the topic this producer is writing data to
-        */
-       private final String topicId;
-
-       /**
-        * (Serializable) SerializationSchema for turning objects used with 
Flink into
-        * byte[] for Kafka.
-        */
-       private final KeyedSerializationSchema<IN> schema;
-
-       /**
-        * User-provided partitioner for assigning an object to a Kafka 
partition.
-        */
-       private final KafkaPartitioner partitioner;
-
-       /**
-        * Flag indicating whether to accept failures (and log them), or to 
fail on failures
-        */
-       private boolean logFailuresOnly;
-       
-       // -------------------------------- Runtime fields 
------------------------------------------
-
-       /** KafkaProducer instance */
-       private transient KafkaProducer<byte[], byte[]> producer;
-
-       /** The callback than handles error propagation or logging callbacks */
-       private transient Callback callback;
-       
-       /** Errors encountered in the async producer are stored here */
-       private transient volatile Exception asyncException;
-
-       // ------------------- Keyless serialization schema constructors 
----------------------
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
-        * the topic.
-        *
-        * @param brokerList
-        *                      Comma separated addresses of the brokers
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
-        */
-       public FlinkKafkaProducer(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), null);
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
-        * the topic.
-        *
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
-        * @param producerConfig
-        *                      Properties with the producer configuration.
-        */
-       public FlinkKafkaProducer(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null);
-       }
-
-       /**
-        * The main constructor for creating a FlinkKafkaProducer.
-        *
-        * @param topicId The topic to write data to
-        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
-        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
-        */
-       public FlinkKafkaProducer(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner 
customPartitioner) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
-
-       }
-
-       // ------------------- Key/Value serialization schema constructors 
----------------------
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
-        * the topic.
-        *
-        * @param brokerList
-        *                      Comma separated addresses of the brokers
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined serialization schema supporting 
key/value messages
-        */
-       public FlinkKafkaProducer(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
-               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), null);
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
-        * the topic.
-        *
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined serialization schema supporting 
key/value messages
-        * @param producerConfig
-        *                      Properties with the producer configuration.
-        */
-       public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
-               this(topicId, serializationSchema, producerConfig, null);
-       }
-
-       /**
-        * The main constructor for creating a FlinkKafkaProducer.
-        *
-        * @param topicId The topic to write data to
-        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
-        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
-        */
-       public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner 
customPartitioner) {
-               Preconditions.checkNotNull(topicId, "TopicID not set");
-               Preconditions.checkNotNull(serializationSchema, 
"serializationSchema not set");
-               Preconditions.checkNotNull(producerConfig, "producerConfig not 
set");
-               ClosureCleaner.ensureSerializable(customPartitioner);
-               ClosureCleaner.ensureSerializable(serializationSchema);
-
-               this.topicId = topicId;
-               this.schema = serializationSchema;
-               this.producerConfig = producerConfig;
-
-               // set the producer configuration properties.
-
-               if 
(!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
-                       
this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
-               } else {
-                       LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-               }
-
-               if 
(!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
-                       
this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
-               } else {
-                       LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-               }
-
-
-               // create a local KafkaProducer to get the list of partitions.
-               // this will also ensure locally that all required 
ProducerConfig values are set.
-               try (KafkaProducer<Void, IN> getPartitionsProd = new 
KafkaProducer<>(this.producerConfig)) {
-                       List<PartitionInfo> partitionsList = 
getPartitionsProd.partitionsFor(topicId);
-
-                       this.partitions = new int[partitionsList.size()];
-                       for (int i = 0; i < partitions.length; i++) {
-                               partitions[i] = 
partitionsList.get(i).partition();
-                       }
-                       getPartitionsProd.close();
-               }
-
-               if (customPartitioner == null) {
-                       this.partitioner = new FixedPartitioner();
-               } else {
-                       this.partitioner = customPartitioner;
-               }
-       }
-
-       // ---------------------------------- Properties 
--------------------------
-
-       /**
-        * Defines whether the producer should fail on errors, or only log them.
-        * If this is set to true, then exceptions will be only logged, if set 
to false,
-        * exceptions will be eventually thrown and cause the streaming program 
to 
-        * fail (and enter recovery).
-        * 
-        * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
-        */
-       public void setLogFailuresOnly(boolean logFailuresOnly) {
-               this.logFailuresOnly = logFailuresOnly;
-       }
-
-       // ----------------------------------- Utilities 
--------------------------
-       
-       /**
-        * Initializes the connection to Kafka.
-        */
-       @Override
-       public void open(Configuration configuration) {
-               producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
-
-               RuntimeContext ctx = getRuntimeContext();
-               partitioner.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks(), partitions);
-
-               LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into 
topic {}", 
-                               ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks(), topicId);
-               
-               if (logFailuresOnly) {
-                       callback = new Callback() {
-                               
-                               @Override
-                               public void onCompletion(RecordMetadata 
metadata, Exception e) {
-                                       if (e != null) {
-                                               LOG.error("Error while sending 
record to Kafka: " + e.getMessage(), e);
-                                       }
-                               }
-                       };
-               }
-               else {
-                       callback = new Callback() {
-                               @Override
-                               public void onCompletion(RecordMetadata 
metadata, Exception exception) {
-                                       if (exception != null && asyncException 
== null) {
-                                               asyncException = exception;
-                                       }
-                               }
-                       };
-               }
-       }
-
-       /**
-        * Called when new data arrives to the sink, and forwards it to Kafka.
-        *
-        * @param next
-        *              The incoming data
-        */
-       @Override
-       public void invoke(IN next) throws Exception {
-               // propagate asynchronous errors
-               checkErroneous();
-
-               byte[] serializedKey = schema.serializeKey(next);
-               byte[] serializedValue = schema.serializeValue(next);
-               ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(topicId,
-                               partitioner.partition(next, partitions.length),
-                               serializedKey, serializedValue);
-               
-               producer.send(record, callback);
-       }
-
-
-       @Override
-       public void close() throws Exception {
-               if (producer != null) {
-                       producer.close();
-               }
-               
-               // make sure we propagate pending errors
-               checkErroneous();
-       }
-
-
-       // ----------------------------------- Utilities 
--------------------------
-
-       private void checkErroneous() throws Exception {
-               Exception e = asyncException;
-               if (e != null) {
-                       // prevent double throwing
-                       asyncException = null;
-                       throw new Exception("Failed to send data to Kafka: " + 
e.getMessage(), e);
-               }
-       }
-       
-       public static Properties getPropertiesFromBrokerList(String brokerList) 
{
-               String[] elements = brokerList.split(",");
-               
-               // validate the broker addresses
-               for (String broker: elements) {
-                       NetUtils.getCorrectHostnamePort(broker);
-               }
-               
-               Properties props = new Properties();
-               props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList);
-               return props;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
deleted file mode 100644
index 4f1a2a6..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-/**
- * A fetcher pulls data from Kafka, from a fix set of partitions.
- * The fetcher supports "seeking" inside the partitions, i.e., moving to a 
different offset.
- */
-public interface Fetcher {
-
-       /**
-        * Closes the fetcher. This will stop any operation in the
-        * {@link #run(SourceFunction.SourceContext, 
KeyedDeserializationSchema, HashMap)} method and eventually
-        * close underlying connections and release all resources.
-        */
-       void close() throws IOException;
-
-       /**
-        * Starts fetch data from Kafka and emitting it into the stream.
-        * 
-        * <p>To provide exactly once guarantees, the fetcher needs emit a 
record and update the update
-        * of the last consumed offset in one atomic operation:</p>
-        * <pre>{@code
-        * 
-        * while (running) {
-        *     T next = ...
-        *     long offset = ...
-        *     int partition = ...
-        *     synchronized (sourceContext.getCheckpointLock()) {
-        *         sourceContext.collect(next);
-        *         lastOffsets[partition] = offset;
-        *     }
-        * }
-        * }</pre>
-        *
-        * @param <T> The type of elements produced by the fetcher and emitted 
to the source context.
-        * @param sourceContext The source context to emit elements to.
-        * @param valueDeserializer The deserializer to decode the raw values 
with.
-        * @param lastOffsets The map into which to store the offsets for which 
elements are emitted (operator state)
-        */
-       <T> void run(SourceFunction.SourceContext<T> sourceContext, 
KeyedDeserializationSchema<T> valueDeserializer,
-                               HashMap<KafkaTopicPartition, Long> lastOffsets) 
throws Exception;
-       
-       /**
-        * Set the next offset to read from for the given partition.
-        * For example, if the partition <i>i</i> offset is set to <i>n</i>, 
the Fetcher's next result
-        * will be the message with <i>offset=n</i>.
-        * 
-        * @param topicPartition The partition for which to seek the offset.
-        * @param offsetToRead To offset to seek to.
-        */
-       void seek(KafkaTopicPartition topicPartition, long offsetToRead);
-
-       /**
-        * Exit run loop with given error and release all resources.
-        *
-        * @param t Error cause
-        */
-       void stopWithError(Throwable t);
-}

Reply via email to