http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
deleted file mode 100644
index 25040eb..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import 
org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Random;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * Tests for the {@link Handover} between Kafka Consumer Thread and the 
fetcher's main thread. 
- */
-public class HandoverTest {
-
-       // 
------------------------------------------------------------------------
-       //  test produce / consumer
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testWithVariableProducer() throws Exception {
-               runProducerConsumerTest(500, 2, 0);
-       }
-
-       @Test
-       public void testWithVariableConsumer() throws Exception {
-               runProducerConsumerTest(500, 0, 2);
-       }
-
-       @Test
-       public void testWithVariableBoth() throws Exception {
-               runProducerConsumerTest(500, 2, 2);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  test error propagation
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testPublishErrorOnEmptyHandover() throws Exception {
-               final Handover handover = new Handover();
-
-               Exception error = new Exception();
-               handover.reportError(error);
-
-               try {
-                       handover.pollNext();
-                       fail("should throw an exception");
-               }
-               catch (Exception e) {
-                       assertEquals(error, e);
-               }
-       }
-
-       @Test
-       public void testPublishErrorOnFullHandover() throws Exception {
-               final Handover handover = new Handover();
-               handover.produce(createTestRecords());
-
-               IOException error = new IOException();
-               handover.reportError(error);
-
-               try {
-                       handover.pollNext();
-                       fail("should throw an exception");
-               }
-               catch (Exception e) {
-                       assertEquals(error, e);
-               }
-       }
-
-       @Test
-       public void testExceptionMarksClosedOnEmpty() throws Exception {
-               final Handover handover = new Handover();
-
-               IllegalStateException error = new IllegalStateException();
-               handover.reportError(error);
-
-               try {
-                       handover.produce(createTestRecords());
-                       fail("should throw an exception");
-               }
-               catch (Handover.ClosedException e) {
-                       // expected
-               }
-       }
-
-       @Test
-       public void testExceptionMarksClosedOnFull() throws Exception {
-               final Handover handover = new Handover();
-               handover.produce(createTestRecords());
-
-               LinkageError error = new LinkageError();
-               handover.reportError(error);
-
-               try {
-                       handover.produce(createTestRecords());
-                       fail("should throw an exception");
-               }
-               catch (Handover.ClosedException e) {
-                       // expected
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  test closing behavior
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testCloseEmptyForConsumer() throws Exception {
-               final Handover handover = new Handover();
-               handover.close();
-
-               try {
-                       handover.pollNext();
-                       fail("should throw an exception");
-               }
-               catch (Handover.ClosedException e) {
-                       // expected
-               }
-       }
-
-       @Test
-       public void testCloseFullForConsumer() throws Exception {
-               final Handover handover = new Handover();
-               handover.produce(createTestRecords());
-               handover.close();
-
-               try {
-                       handover.pollNext();
-                       fail("should throw an exception");
-               }
-               catch (Handover.ClosedException e) {
-                       // expected
-               }
-       }
-
-       @Test
-       public void testCloseEmptyForProducer() throws Exception {
-               final Handover handover = new Handover();
-               handover.close();
-
-               try {
-                       handover.produce(createTestRecords());
-                       fail("should throw an exception");
-               }
-               catch (Handover.ClosedException e) {
-                       // expected
-               }
-       }
-
-       @Test
-       public void testCloseFullForProducer() throws Exception {
-               final Handover handover = new Handover();
-               handover.produce(createTestRecords());
-               handover.close();
-
-               try {
-                       handover.produce(createTestRecords());
-                       fail("should throw an exception");
-               }
-               catch (Handover.ClosedException e) {
-                       // expected
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  test wake up behavior
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testWakeupDoesNotWakeWhenEmpty() throws Exception {
-               Handover handover = new Handover();
-               handover.wakeupProducer();
-
-               // produce into a woken but empty handover
-               try {
-                       handover.produce(createTestRecords());
-               }
-               catch (Handover.WakeupException e) {
-                       fail();
-               }
-
-               // handover now has records, next time we wakeup and produce it 
needs
-               // to throw an exception
-               handover.wakeupProducer();
-               try {
-                       handover.produce(createTestRecords());
-                       fail("should throw an exception");
-               }
-               catch (Handover.WakeupException e) {
-                       // expected
-               }
-
-               // empty the handover
-               assertNotNull(handover.pollNext());
-               
-               // producing into an empty handover should work
-               try {
-                       handover.produce(createTestRecords());
-               }
-               catch (Handover.WakeupException e) {
-                       fail();
-               }
-       }
-
-       @Test
-       public void testWakeupWakesOnlyOnce() throws Exception {
-               // create a full handover
-               final Handover handover = new Handover();
-               handover.produce(createTestRecords());
-
-               handover.wakeupProducer();
-
-               try {
-                       handover.produce(createTestRecords());
-                       fail();
-               } catch (WakeupException e) {
-                       // expected
-               }
-
-               CheckedThread producer = new CheckedThread() {
-                       @Override
-                       public void go() throws Exception {
-                               handover.produce(createTestRecords());
-                       }
-               };
-               producer.start();
-
-               // the producer must go blocking
-               producer.waitUntilThreadHoldsLock(10000);
-
-               // release the thread by consuming something
-               assertNotNull(handover.pollNext());
-               producer.sync();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  utilities
-       // 
------------------------------------------------------------------------
-
-       private void runProducerConsumerTest(int numRecords, int 
maxProducerDelay, int maxConsumerDelay) throws Exception {
-               // generate test data
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final ConsumerRecords<byte[], byte[]>[] data = new 
ConsumerRecords[numRecords];
-               for (int i = 0; i < numRecords; i++) {
-                       data[i] = createTestRecords();
-               }
-
-               final Handover handover = new Handover();
-
-               ProducerThread producer = new ProducerThread(handover, data, 
maxProducerDelay);
-               ConsumerThread consumer = new ConsumerThread(handover, data, 
maxConsumerDelay);
-
-               consumer.start();
-               producer.start();
-
-               // sync first on the consumer, so it propagates assertion errors
-               consumer.sync();
-               producer.sync();
-       }
-
-       @SuppressWarnings("unchecked")
-       private static ConsumerRecords<byte[], byte[]> createTestRecords() {
-               return mock(ConsumerRecords.class);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static abstract class CheckedThread extends Thread {
-
-               private volatile Throwable error;
-
-               public abstract void go() throws Exception;
-
-               @Override
-               public void run() {
-                       try {
-                               go();
-                       }
-                       catch (Throwable t) {
-                               error = t;
-                       }
-               }
-
-               public void sync() throws Exception {
-                       join();
-                       if (error != null) {
-                               ExceptionUtils.rethrowException(error, 
error.getMessage());
-                       }
-               }
-
-               public void waitUntilThreadHoldsLock(long timeoutMillis) throws 
InterruptedException, TimeoutException {
-                       final long deadline = System.nanoTime() + timeoutMillis 
* 1_000_000;
-                       
-                       while (!isBlockedOrWaiting() && (System.nanoTime() < 
deadline)) {
-                               Thread.sleep(1);
-                       }
-
-                       if (!isBlockedOrWaiting()) {
-                               throw new TimeoutException();
-                       }
-               }
-
-               private boolean isBlockedOrWaiting() {
-                       State state = getState();
-                       return state == State.BLOCKED || state == State.WAITING 
|| state == State.TIMED_WAITING;
-               }
-       }
-
-       private static class ProducerThread extends CheckedThread {
-
-               private final Random rnd = new Random();
-               private final Handover handover;
-               private final ConsumerRecords<byte[], byte[]>[] data;
-               private final int maxDelay;
-
-               private ProducerThread(Handover handover, 
ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
-                       this.handover = handover;
-                       this.data = data;
-                       this.maxDelay = maxDelay;
-               }
-
-               @Override
-               public void go() throws Exception {
-                       for (ConsumerRecords<byte[], byte[]> rec : data) {
-                               handover.produce(rec);
-
-                               if (maxDelay > 0) {
-                                       int delay = rnd.nextInt(maxDelay);
-                                       Thread.sleep(delay);
-                               }
-                       }
-               }
-       }
-
-       private static class ConsumerThread extends CheckedThread {
-
-               private final Random rnd = new Random();
-               private final Handover handover;
-               private final ConsumerRecords<byte[], byte[]>[] data;
-               private final int maxDelay;
-
-               private ConsumerThread(Handover handover, 
ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
-                       this.handover = handover;
-                       this.data = data;
-                       this.maxDelay = maxDelay;
-               }
-
-               @Override
-               public void go() throws Exception {
-                       for (ConsumerRecords<byte[], byte[]> rec : data) {
-                               ConsumerRecords<byte[], byte[]> next = 
handover.pollNext();
-
-                               assertEquals(rec, next);
-
-                               if (maxDelay > 0) {
-                                       int delay = rnd.nextInt(maxDelay);
-                                       Thread.sleep(delay);
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
deleted file mode 100644
index 4ac1773..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-log4j.logger.org.apache.zookeeper=OFF, testlogger
-log4j.logger.state.change.logger=OFF, testlogger
-log4j.logger.kafka=OFF, testlogger
-
-log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
deleted file mode 100644
index ef71bde..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
+++ /dev/null
@@ -1,212 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-streaming-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-connector-kafka-base_2.10</artifactId>
-       <name>flink-connector-kafka-base</name>
-
-       <packaging>jar</packaging>
-
-       <!-- Allow users to pass custom connector versions -->
-       <properties>
-               <kafka.version>0.8.2.2</kafka.version>
-       </properties>
-
-       <dependencies>
-
-               <!-- core dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-                       <!-- Projects depending on this project,
-                       won't depend on flink-table. -->
-                       <optional>true</optional>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.kafka</groupId>
-                       <artifactId>kafka_${scala.binary.version}</artifactId>
-                       <version>${kafka.version}</version>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>com.sun.jmx</groupId>
-                                       <artifactId>jmxri</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>com.sun.jdmk</groupId>
-                                       <artifactId>jmxtools</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>log4j</groupId>
-                                       <artifactId>log4j</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.slf4j</groupId>
-                                       <artifactId>slf4j-simple</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>net.sf.jopt-simple</groupId>
-                                       <artifactId>jopt-simple</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.scala-lang</groupId>
-                                       <artifactId>scala-reflect</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.scala-lang</groupId>
-                                       <artifactId>scala-compiler</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>com.yammer.metrics</groupId>
-                                       
<artifactId>metrics-annotation</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.xerial.snappy</groupId>
-                                       <artifactId>snappy-java</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-               <!-- test dependencies -->
-               
-               <!-- force using the latest zkclient -->
-               <dependency>
-                       <groupId>com.101tec</groupId>
-                       <artifactId>zkclient</artifactId>
-                       <version>0.7</version>
-                       <type>jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-
-               <dependency>
-                       <groupId>org.apache.curator</groupId>
-                       <artifactId>curator-test</artifactId>
-                       <version>${curator.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-metrics-jmx</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-tests_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-runtime_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-minikdc</artifactId>
-                       <version>${minikdc.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-       </dependencies>
-
-       <dependencyManagement>
-               <dependencies>
-                       <dependency>
-                               <groupId>com.101tec</groupId>
-                               <artifactId>zkclient</artifactId>
-                               <version>0.7</version>
-                       </dependency>
-               </dependencies>
-       </dependencyManagement>
-       
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-jar-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <goals>
-                                                       <goal>test-jar</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-                       <!--
-            https://issues.apache.org/jira/browse/DIRSHARED-134
-            Required to pull the Mini-KDC transitive dependency
-            -->
-                       <plugin>
-                               <groupId>org.apache.felix</groupId>
-                               <artifactId>maven-bundle-plugin</artifactId>
-                               <version>3.0.1</version>
-                               <inherited>true</inherited>
-                               <extensions>true</extensions>
-                       </plugin>
-               </plugins>
-       </build>
-       
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
deleted file mode 100644
index aef7116..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.SerializedValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Base class of all Flink Kafka Consumer data sources.
- * This implements the common behavior across all Kafka versions.
- * 
- * <p>The Kafka version specific behavior is defined mainly in the specific 
subclasses of the
- * {@link AbstractFetcher}.
- * 
- * @param <T> The type of records produced by this data source
- */
-public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFunction<T> implements 
-               CheckpointListener,
-               ResultTypeQueryable<T>,
-               CheckpointedFunction {
-       private static final long serialVersionUID = -6272159445203409112L;
-
-       protected static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
-       
-       /** The maximum number of pending non-committed checkpoints to track, 
to avoid memory leaks */
-       public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
-
-       /** Boolean configuration key to disable metrics tracking **/
-       public static final String KEY_DISABLE_METRICS = 
"flink.disable-metrics";
-
-       // 
------------------------------------------------------------------------
-       //  configuration state, set on the client relevant for all subtasks
-       // 
------------------------------------------------------------------------
-
-       private final List<String> topics;
-       
-       /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
-       protected final KeyedDeserializationSchema<T> deserializer;
-
-       /** The set of topic partitions that the source will read */
-       protected List<KafkaTopicPartition> subscribedPartitions;
-       
-       /** Optional timestamp extractor / watermark generator that will be run 
per Kafka partition,
-        * to exploit per-partition timestamp characteristics.
-        * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
-       private SerializedValue<AssignerWithPeriodicWatermarks<T>> 
periodicWatermarkAssigner;
-       
-       /** Optional timestamp extractor / watermark generator that will be run 
per Kafka partition,
-        * to exploit per-partition timestamp characteristics. 
-        * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
-       private SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
punctuatedWatermarkAssigner;
-
-       private transient ListState<Tuple2<KafkaTopicPartition, Long>> 
offsetsStateForCheckpoint;
-
-       // 
------------------------------------------------------------------------
-       //  runtime state (used individually by each parallel subtask) 
-       // 
------------------------------------------------------------------------
-       
-       /** Data for pending but uncommitted offsets */
-       private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-
-       /** The fetcher implements the connections to the Kafka brokers */
-       private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
-       
-       /** The offsets to restore to, if the consumer restores state from a 
checkpoint */
-       private transient volatile HashMap<KafkaTopicPartition, Long> 
restoreToOffset;
-       
-       /** Flag indicating whether the consumer is still running **/
-       private volatile boolean running = true;
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Base constructor.
-        *
-        * @param deserializer
-        *           The deserializer to turn raw byte messages into Java/Scala 
objects.
-        */
-       public FlinkKafkaConsumerBase(List<String> topics, 
KeyedDeserializationSchema<T> deserializer) {
-               this.topics = checkNotNull(topics);
-               checkArgument(topics.size() > 0, "You have to define at least 
one topic.");
-               this.deserializer = checkNotNull(deserializer, 
"valueDeserializer");
-       }
-
-       /**
-        * This method must be called from the subclasses, to set the list of 
all subscribed partitions
-        * that this consumer will fetch from (across all subtasks).
-        * 
-        * @param allSubscribedPartitions The list of all partitions that all 
subtasks together should fetch from.
-        */
-       protected void setSubscribedPartitions(List<KafkaTopicPartition> 
allSubscribedPartitions) {
-               checkNotNull(allSubscribedPartitions);
-               this.subscribedPartitions = 
Collections.unmodifiableList(allSubscribedPartitions);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Configuration
-       // 
------------------------------------------------------------------------
-       
-       /**
-        * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit 
watermarks in a punctuated manner.
-        * The watermark extractor will run per Kafka partition, watermarks 
will be merged across partitions
-        * in the same way as in the Flink runtime, when streams are merged.
-        * 
-        * <p>When a subtask of a FlinkKafkaConsumer source reads multiple 
Kafka partitions,
-        * the streams from the partitions are unioned in a "first come first 
serve" fashion. Per-partition
-        * characteristics are usually lost that way. For example, if the 
timestamps are strictly ascending
-        * per Kafka partition, they will not be strictly ascending in the 
resulting Flink DataStream, if the
-        * parallel source subtask reads more that one partition.
-        * 
-        * <p>Running timestamp extractors / watermark generators directly 
inside the Kafka source, per Kafka
-        * partition, allows users to let them exploit the per-partition 
characteristics.
-        * 
-        * <p>Note: One can use either an {@link 
AssignerWithPunctuatedWatermarks} or an
-        * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
-        * 
-        * @param assigner The timestamp assigner / watermark generator to use.
-        * @return The consumer object, to allow function chaining.   
-        */
-       public FlinkKafkaConsumerBase<T> 
assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
-               checkNotNull(assigner);
-               
-               if (this.periodicWatermarkAssigner != null) {
-                       throw new IllegalStateException("A periodic watermark 
emitter has already been set.");
-               }
-               try {
-                       ClosureCleaner.clean(assigner, true);
-                       this.punctuatedWatermarkAssigner = new 
SerializedValue<>(assigner);
-                       return this;
-               } catch (Exception e) {
-                       throw new IllegalArgumentException("The given assigner 
is not serializable", e);
-               }
-       }
-
-       /**
-        * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit 
watermarks in a punctuated manner.
-        * The watermark extractor will run per Kafka partition, watermarks 
will be merged across partitions
-        * in the same way as in the Flink runtime, when streams are merged.
-        *
-        * <p>When a subtask of a FlinkKafkaConsumer source reads multiple 
Kafka partitions,
-        * the streams from the partitions are unioned in a "first come first 
serve" fashion. Per-partition
-        * characteristics are usually lost that way. For example, if the 
timestamps are strictly ascending
-        * per Kafka partition, they will not be strictly ascending in the 
resulting Flink DataStream, if the
-        * parallel source subtask reads more that one partition.
-        *
-        * <p>Running timestamp extractors / watermark generators directly 
inside the Kafka source, per Kafka
-        * partition, allows users to let them exploit the per-partition 
characteristics.
-        *
-        * <p>Note: One can use either an {@link 
AssignerWithPunctuatedWatermarks} or an
-        * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
-        *
-        * @param assigner The timestamp assigner / watermark generator to use.
-        * @return The consumer object, to allow function chaining.   
-        */
-       public FlinkKafkaConsumerBase<T> 
assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
-               checkNotNull(assigner);
-               
-               if (this.punctuatedWatermarkAssigner != null) {
-                       throw new IllegalStateException("A punctuated watermark 
emitter has already been set.");
-               }
-               try {
-                       ClosureCleaner.clean(assigner, true);
-                       this.periodicWatermarkAssigner = new 
SerializedValue<>(assigner);
-                       return this;
-               } catch (Exception e) {
-                       throw new IllegalArgumentException("The given assigner 
is not serializable", e);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Work methods
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void run(SourceContext<T> sourceContext) throws Exception {
-               if (subscribedPartitions == null) {
-                       throw new Exception("The partitions were not set for 
the consumer");
-               }
-
-               // we need only do work, if we actually have partitions assigned
-               if (!subscribedPartitions.isEmpty()) {
-
-                       // (1) create the fetcher that will communicate with 
the Kafka brokers
-                       final AbstractFetcher<T, ?> fetcher = createFetcher(
-                                       sourceContext, subscribedPartitions,
-                                       periodicWatermarkAssigner, 
punctuatedWatermarkAssigner,
-                                       (StreamingRuntimeContext) 
getRuntimeContext());
-
-                       // (2) set the fetcher to the restored checkpoint 
offsets
-                       if (restoreToOffset != null) {
-                               fetcher.restoreOffsets(restoreToOffset);
-                       }
-
-                       // publish the reference, for snapshot-, commit-, and 
cancel calls
-                       // IMPORTANT: We can only do that now, because only now 
will calls to
-                       //            the fetchers 'snapshotCurrentState()' 
method return at least
-                       //            the restored offsets
-                       this.kafkaFetcher = fetcher;
-                       if (!running) {
-                               return;
-                       }
-                       
-                       // (3) run the fetcher' main work method
-                       fetcher.runFetchLoop();
-               }
-               else {
-                       // this source never completes, so emit a 
Long.MAX_VALUE watermark
-                       // to not block watermark forwarding
-                       sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-
-                       // wait until this is canceled
-                       final Object waitLock = new Object();
-                       while (running) {
-                               try {
-                                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
-                                       synchronized (waitLock) {
-                                               waitLock.wait();
-                                       }
-                               }
-                               catch (InterruptedException e) {
-                                       if (!running) {
-                                               // restore the interrupted 
state, and fall through the loop
-                                               
Thread.currentThread().interrupt();
-                                       }
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public void cancel() {
-               // set ourselves as not running
-               running = false;
-               
-               // abort the fetcher, if there is one
-               if (kafkaFetcher != null) {
-                       kafkaFetcher.cancel();
-               }
-
-               // there will be an interrupt() call to the main thread anyways
-       }
-
-       @Override
-       public void open(Configuration configuration) {
-               List<KafkaTopicPartition> kafkaTopicPartitions = 
getKafkaPartitions(topics);
-
-               if (kafkaTopicPartitions != null) {
-                       assignTopicPartitions(kafkaTopicPartitions);
-               }
-       }
-
-       @Override
-       public void close() throws Exception {
-               // pretty much the same logic as cancelling
-               try {
-                       cancel();
-               } finally {
-                       super.close();
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Checkpoint and restore
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void initializeState(FunctionInitializationContext context) 
throws Exception {
-
-               OperatorStateStore stateStore = context.getOperatorStateStore();
-               offsetsStateForCheckpoint = 
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
-
-               if (context.isRestored()) {
-                       restoreToOffset = new HashMap<>();
-                       for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : 
offsetsStateForCheckpoint.get()) {
-                               restoreToOffset.put(kafkaOffset.f0, 
kafkaOffset.f1);
-                       }
-
-                       LOG.info("Setting restore state in the 
FlinkKafkaConsumer.");
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Using the following offsets: {}", 
restoreToOffset);
-                       }
-               } else {
-                       LOG.info("No restore state for FlinkKafkaConsumer.");
-               }
-       }
-
-       @Override
-       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-               if (!running) {
-                       LOG.debug("snapshotState() called on closed source");
-               } else {
-
-                       offsetsStateForCheckpoint.clear();
-
-                       final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
-                       if (fetcher == null) {
-                               // the fetcher has not yet been initialized, 
which means we need to return the
-                               // originally restored offsets or the assigned 
partitions
-
-                               if (restoreToOffset != null) {
-
-                                       for (Map.Entry<KafkaTopicPartition, 
Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
-                                               offsetsStateForCheckpoint.add(
-                                                               
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), 
kafkaTopicPartitionLongEntry.getValue()));
-                                       }
-                               } else if (subscribedPartitions != null) {
-                                       for (KafkaTopicPartition 
subscribedPartition : subscribedPartitions) {
-                                               offsetsStateForCheckpoint.add(
-                                                               
Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET));
-                                       }
-                               }
-
-                               // the map cannot be asynchronously updated, 
because only one checkpoint call can happen
-                               // on this function at a time: either 
snapshotState() or notifyCheckpointComplete()
-                               
pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset);
-                       } else {
-                               HashMap<KafkaTopicPartition, Long> 
currentOffsets = fetcher.snapshotCurrentState();
-
-                               // the map cannot be asynchronously updated, 
because only one checkpoint call can happen
-                               // on this function at a time: either 
snapshotState() or notifyCheckpointComplete()
-                               
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
-
-                               for (Map.Entry<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
-                                       offsetsStateForCheckpoint.add(
-                                                       
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), 
kafkaTopicPartitionLongEntry.getValue()));
-                               }
-                       }
-
-                       // truncate the map of pending offsets to commit, to 
prevent infinite growth
-                       while (pendingOffsetsToCommit.size() > 
MAX_NUM_PENDING_CHECKPOINTS) {
-                               pendingOffsetsToCommit.remove(0);
-                       }
-               }
-       }
-
-       @Override
-       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-               if (!running) {
-                       LOG.debug("notifyCheckpointComplete() called on closed 
source");
-                       return;
-               }
-
-               final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
-               if (fetcher == null) {
-                       LOG.debug("notifyCheckpointComplete() called on 
uninitialized source");
-                       return;
-               }
-               
-               // only one commit operation must be in progress
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Committing offsets to Kafka/ZooKeeper for 
checkpoint " + checkpointId);
-               }
-
-               try {
-                       final int posInMap = 
pendingOffsetsToCommit.indexOf(checkpointId);
-                       if (posInMap == -1) {
-                               LOG.warn("Received confirmation for unknown 
checkpoint id {}", checkpointId);
-                               return;
-                       }
-
-                       @SuppressWarnings("unchecked")
-                       HashMap<KafkaTopicPartition, Long> offsets =
-                                       (HashMap<KafkaTopicPartition, Long>) 
pendingOffsetsToCommit.remove(posInMap);
-
-                       // remove older checkpoints in map
-                       for (int i = 0; i < posInMap; i++) {
-                               pendingOffsetsToCommit.remove(0);
-                       }
-
-                       if (offsets == null || offsets.size() == 0) {
-                               LOG.debug("Checkpoint state was empty.");
-                               return;
-                       }
-                       fetcher.commitInternalOffsetsToKafka(offsets);
-               }
-               catch (Exception e) {
-                       if (running) {
-                               throw e;
-                       }
-                       // else ignore exception if we are no longer running
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Kafka Consumer specific methods
-       // 
------------------------------------------------------------------------
-       
-       /**
-        * Creates the fetcher that connect to the Kafka brokers, pulls data, 
deserialized the
-        * data, and emits it into the data streams.
-        * 
-        * @param sourceContext The source context to emit data to.
-        * @param thisSubtaskPartitions The set of partitions that this subtask 
should handle.
-        * @param watermarksPeriodic Optional, a serialized timestamp extractor 
/ periodic watermark generator.
-        * @param watermarksPunctuated Optional, a serialized timestamp 
extractor / punctuated watermark generator.
-        * @param runtimeContext The task's runtime context.
-        * 
-        * @return The instantiated fetcher
-        * 
-        * @throws Exception The method should forward exceptions
-        */
-       protected abstract AbstractFetcher<T, ?> createFetcher(
-                       SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> thisSubtaskPartitions,
-                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
-                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
-                       StreamingRuntimeContext runtimeContext) throws 
Exception;
-
-       protected abstract List<KafkaTopicPartition> 
getKafkaPartitions(List<String> topics);
-       
-       // 
------------------------------------------------------------------------
-       //  ResultTypeQueryable methods 
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public TypeInformation<T> getProducedType() {
-               return deserializer.getProducedType();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       private void assignTopicPartitions(List<KafkaTopicPartition> 
kafkaTopicPartitions) {
-               subscribedPartitions = new ArrayList<>();
-
-               if (restoreToOffset != null) {
-                       for (KafkaTopicPartition kafkaTopicPartition : 
kafkaTopicPartitions) {
-                               if 
(restoreToOffset.containsKey(kafkaTopicPartition)) {
-                                       
subscribedPartitions.add(kafkaTopicPartition);
-                               }
-                       }
-               } else {
-                       Collections.sort(kafkaTopicPartitions, new 
Comparator<KafkaTopicPartition>() {
-                               @Override
-                               public int compare(KafkaTopicPartition o1, 
KafkaTopicPartition o2) {
-                                       int topicComparison = 
o1.getTopic().compareTo(o2.getTopic());
-
-                                       if (topicComparison == 0) {
-                                               return o1.getPartition() - 
o2.getPartition();
-                                       } else {
-                                               return topicComparison;
-                                       }
-                               }
-                       });
-
-                       for (int i = 
getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i 
+= getRuntimeContext().getNumberOfParallelSubtasks()) {
-                               
subscribedPartitions.add(kafkaTopicPartitions.get(i));
-                       }
-               }
-       }
-
-       /**
-        * Selects which of the given partitions should be handled by a 
specific consumer,
-        * given a certain number of consumers.
-        * 
-        * @param allPartitions The partitions to select from
-        * @param numConsumers The number of consumers
-        * @param consumerIndex The index of the specific consumer
-        * 
-        * @return The sublist of partitions to be handled by that consumer.
-        */
-       protected static List<KafkaTopicPartition> assignPartitions(
-                       List<KafkaTopicPartition> allPartitions,
-                       int numConsumers, int consumerIndex) {
-               final List<KafkaTopicPartition> thisSubtaskPartitions = new 
ArrayList<>(
-                               allPartitions.size() / numConsumers + 1);
-
-               for (int i = 0; i < allPartitions.size(); i++) {
-                       if (i % numConsumers == consumerIndex) {
-                               thisSubtaskPartitions.add(allPartitions.get(i));
-                       }
-               }
-               
-               return thisSubtaskPartitions;
-       }
-       
-       /**
-        * Logs the partition information in INFO level.
-        * 
-        * @param logger The logger to log to.
-        * @param partitionInfos List of subscribed partitions
-        */
-       protected static void logPartitionInfo(Logger logger, 
List<KafkaTopicPartition> partitionInfos) {
-               Map<String, Integer> countPerTopic = new HashMap<>();
-               for (KafkaTopicPartition partition : partitionInfos) {
-                       Integer count = countPerTopic.get(partition.getTopic());
-                       if (count == null) {
-                               count = 1;
-                       } else {
-                               count++;
-                       }
-                       countPerTopic.put(partition.getTopic(), count);
-               }
-               StringBuilder sb = new StringBuilder(
-                               "Consumer is going to read the following topics 
(with number of partitions): ");
-               
-               for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) {
-                       sb.append(e.getKey()).append(" 
(").append(e.getValue()).append("), ");
-               }
-               
-               logger.info(sb.toString());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
deleted file mode 100644
index d413f1c..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.runtime.util.SerializableObject;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Collections;
-import java.util.Comparator;
-
-import static java.util.Objects.requireNonNull;
-
-
-/**
- * Flink Sink to produce data into a Kafka topic.
- *
- * Please note that this producer provides at-least-once reliability 
guarantees when
- * checkpoints are enabled and setFlushOnCheckpoint(true) is set.
- * Otherwise, the producer doesn't provide any reliability guarantees.
- *
- * @param <IN> Type of the messages to write into Kafka.
- */
-public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> 
implements CheckpointedFunction {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
-
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * Configuration key for disabling the metrics reporting
-        */
-       public static final String KEY_DISABLE_METRICS = 
"flink.disable-metrics";
-
-       /**
-        * Array with the partition ids of the given defaultTopicId
-        * The size of this array is the number of partitions
-        */
-       protected int[] partitions;
-
-       /**
-        * User defined properties for the Producer
-        */
-       protected final Properties producerConfig;
-
-       /**
-        * The name of the default topic this producer is writing data to
-        */
-       protected final String defaultTopicId;
-
-       /**
-        * (Serializable) SerializationSchema for turning objects used with 
Flink into
-        * byte[] for Kafka.
-        */
-       protected final KeyedSerializationSchema<IN> schema;
-
-       /**
-        * User-provided partitioner for assigning an object to a Kafka 
partition.
-        */
-       protected final KafkaPartitioner<IN> partitioner;
-
-       /**
-        * Flag indicating whether to accept failures (and log them), or to 
fail on failures
-        */
-       protected boolean logFailuresOnly;
-
-       /**
-        * If true, the producer will wait until all outstanding records have 
been send to the broker.
-        */
-       protected boolean flushOnCheckpoint;
-       
-       // -------------------------------- Runtime fields 
------------------------------------------
-
-       /** KafkaProducer instance */
-       protected transient KafkaProducer<byte[], byte[]> producer;
-
-       /** The callback than handles error propagation or logging callbacks */
-       protected transient Callback callback;
-
-       /** Errors encountered in the async producer are stored here */
-       protected transient volatile Exception asyncException;
-
-       /** Lock for accessing the pending records */
-       protected final SerializableObject pendingRecordsLock = new 
SerializableObject();
-
-       /** Number of unacknowledged records. */
-       protected long pendingRecords;
-
-       protected OperatorStateStore stateStore;
-
-
-       /**
-        * The main constructor for creating a FlinkKafkaProducer.
-        *
-        * @param defaultTopicId The default topic to write data to
-        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
-        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner
-        */
-       public FlinkKafkaProducerBase(String defaultTopicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
-               requireNonNull(defaultTopicId, "TopicID not set");
-               requireNonNull(serializationSchema, "serializationSchema not 
set");
-               requireNonNull(producerConfig, "producerConfig not set");
-               ClosureCleaner.clean(customPartitioner, true);
-               ClosureCleaner.ensureSerializable(serializationSchema);
-
-               this.defaultTopicId = defaultTopicId;
-               this.schema = serializationSchema;
-               this.producerConfig = producerConfig;
-
-               // set the producer configuration properties for kafka record 
key value serializers.
-               if 
(!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
-                       
this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
-               } else {
-                       LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-               }
-
-               if 
(!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
-                       
this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
-               } else {
-                       LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-               }
-
-               // eagerly ensure that bootstrap servers are set.
-               if 
(!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
-                       throw new 
IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be 
supplied in the producer config properties.");
-               }
-
-               this.partitioner = customPartitioner;
-       }
-
-       // ---------------------------------- Properties 
--------------------------
-
-       /**
-        * Defines whether the producer should fail on errors, or only log them.
-        * If this is set to true, then exceptions will be only logged, if set 
to false,
-        * exceptions will be eventually thrown and cause the streaming program 
to 
-        * fail (and enter recovery).
-        * 
-        * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
-        */
-       public void setLogFailuresOnly(boolean logFailuresOnly) {
-               this.logFailuresOnly = logFailuresOnly;
-       }
-
-       /**
-        * If set to true, the Flink producer will wait for all outstanding 
messages in the Kafka buffers
-        * to be acknowledged by the Kafka producer on a checkpoint.
-        * This way, the producer can guarantee that messages in the Kafka 
buffers are part of the checkpoint.
-        *
-        * @param flush Flag indicating the flushing mode (true = flush on 
checkpoint)
-        */
-       public void setFlushOnCheckpoint(boolean flush) {
-               this.flushOnCheckpoint = flush;
-       }
-
-       /**
-        * Used for testing only
-        */
-       protected <K,V> KafkaProducer<K,V> getKafkaProducer(Properties props) {
-               return new KafkaProducer<>(props);
-       }
-
-       // ----------------------------------- Utilities 
--------------------------
-       
-       /**
-        * Initializes the connection to Kafka.
-        */
-       @Override
-       public void open(Configuration configuration) {
-               producer = getKafkaProducer(this.producerConfig);
-
-               // the fetched list is immutable, so we're creating a mutable 
copy in order to sort it
-               List<PartitionInfo> partitionsList = new 
ArrayList<>(producer.partitionsFor(defaultTopicId));
-
-               // sort the partitions by partition id to make sure the fetched 
partition list is the same across subtasks
-               Collections.sort(partitionsList, new 
Comparator<PartitionInfo>() {
-                       @Override
-                       public int compare(PartitionInfo o1, PartitionInfo o2) {
-                               return Integer.compare(o1.partition(), 
o2.partition());
-                       }
-               });
-
-               partitions = new int[partitionsList.size()];
-               for (int i = 0; i < partitions.length; i++) {
-                       partitions[i] = partitionsList.get(i).partition();
-               }
-
-               RuntimeContext ctx = getRuntimeContext();
-               if (partitioner != null) {
-                       partitioner.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks(), partitions);
-               }
-
-               LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into 
topic {}", 
-                               ctx.getIndexOfThisSubtask() + 1, 
ctx.getNumberOfParallelSubtasks(), defaultTopicId);
-
-               // register Kafka metrics to Flink accumulators
-               if 
(!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, 
"false"))) {
-                       Map<MetricName, ? extends Metric> metrics = 
this.producer.metrics();
-
-                       if (metrics == null) {
-                               // MapR's Kafka implementation returns null 
here.
-                               LOG.info("Producer implementation does not 
support metrics");
-                       } else {
-                               final MetricGroup kafkaMetricGroup = 
getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
-                               for (Map.Entry<MetricName, ? extends Metric> 
metric: metrics.entrySet()) {
-                                       
kafkaMetricGroup.gauge(metric.getKey().name(), new 
KafkaMetricWrapper(metric.getValue()));
-                               }
-                       }
-               }
-
-               if (flushOnCheckpoint && 
!((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
-                       LOG.warn("Flushing on checkpoint is enabled, but 
checkpointing is not enabled. Disabling flushing.");
-                       flushOnCheckpoint = false;
-               }
-
-               if (logFailuresOnly) {
-                       callback = new Callback() {
-                               @Override
-                               public void onCompletion(RecordMetadata 
metadata, Exception e) {
-                                       if (e != null) {
-                                               LOG.error("Error while sending 
record to Kafka: " + e.getMessage(), e);
-                                       }
-                                       acknowledgeMessage();
-                               }
-                       };
-               }
-               else {
-                       callback = new Callback() {
-                               @Override
-                               public void onCompletion(RecordMetadata 
metadata, Exception exception) {
-                                       if (exception != null && asyncException 
== null) {
-                                               asyncException = exception;
-                                       }
-                                       acknowledgeMessage();
-                               }
-                       };
-               }
-       }
-
-       /**
-        * Called when new data arrives to the sink, and forwards it to Kafka.
-        *
-        * @param next
-        *              The incoming data
-        */
-       @Override
-       public void invoke(IN next) throws Exception {
-               // propagate asynchronous errors
-               checkErroneous();
-
-               byte[] serializedKey = schema.serializeKey(next);
-               byte[] serializedValue = schema.serializeValue(next);
-               String targetTopic = schema.getTargetTopic(next);
-               if (targetTopic == null) {
-                       targetTopic = defaultTopicId;
-               }
-
-               ProducerRecord<byte[], byte[]> record;
-               if (partitioner == null) {
-                       record = new ProducerRecord<>(targetTopic, 
serializedKey, serializedValue);
-               } else {
-                       record = new ProducerRecord<>(targetTopic, 
partitioner.partition(next, serializedKey, serializedValue, partitions.length), 
serializedKey, serializedValue);
-               }
-               if (flushOnCheckpoint) {
-                       synchronized (pendingRecordsLock) {
-                               pendingRecords++;
-                       }
-               }
-               producer.send(record, callback);
-       }
-
-
-       @Override
-       public void close() throws Exception {
-               if (producer != null) {
-                       producer.close();
-               }
-               
-               // make sure we propagate pending errors
-               checkErroneous();
-       }
-
-       // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
-
-       private void acknowledgeMessage() {
-               if (flushOnCheckpoint) {
-                       synchronized (pendingRecordsLock) {
-                               pendingRecords--;
-                               if (pendingRecords == 0) {
-                                       pendingRecordsLock.notifyAll();
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Flush pending records.
-        */
-       protected abstract void flush();
-
-       @Override
-       public void initializeState(FunctionInitializationContext context) 
throws Exception {
-               this.stateStore = context.getOperatorStateStore();
-       }
-
-       @Override
-       public void snapshotState(FunctionSnapshotContext ctx) throws Exception 
{
-               if (flushOnCheckpoint) {
-                       // flushing is activated: We need to wait until 
pendingRecords is 0
-                       flush();
-                       synchronized (pendingRecordsLock) {
-                               if (pendingRecords != 0) {
-                                       throw new 
IllegalStateException("Pending record count must be zero at this point: " + 
pendingRecords);
-                               }
-                               // pending records count is 0. We can now 
confirm the checkpoint
-                       }
-               }
-       }
-
-       // ----------------------------------- Utilities 
--------------------------
-
-       protected void checkErroneous() throws Exception {
-               Exception e = asyncException;
-               if (e != null) {
-                       // prevent double throwing
-                       asyncException = null;
-                       throw new Exception("Failed to send data to Kafka: " + 
e.getMessage(), e);
-               }
-       }
-       
-       public static Properties getPropertiesFromBrokerList(String brokerList) 
{
-               String[] elements = brokerList.split(",");
-               
-               // validate the broker addresses
-               for (String broker: elements) {
-                       NetUtils.getCorrectHostnamePort(broker);
-               }
-               
-               Properties props = new Properties();
-               props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList);
-               return props;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
deleted file mode 100644
index ee98783..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-/**
- * Base class for {@link KafkaTableSink} that serializes data in JSON format
- */
-public abstract class KafkaJsonTableSink extends KafkaTableSink {
-
-       /**
-        * Creates KafkaJsonTableSink
-        *
-        * @param topic topic in Kafka to which table is written
-        * @param properties properties to connect to Kafka
-        * @param partitioner Kafka partitioner
-        */
-       public KafkaJsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
-               super(topic, properties, partitioner);
-       }
-
-       @Override
-       protected SerializationSchema<Row> createSerializationSchema(String[] 
fieldNames) {
-               return new JsonRowSerializationSchema(fieldNames);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
deleted file mode 100644
index f145509..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * A version-agnostic Kafka JSON {@link StreamTableSource}.
- *
- * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
- *
- * <p>The field names are used to parse the JSON file and so are the types.
- */
-public abstract class KafkaJsonTableSource extends KafkaTableSource {
-
-       /**
-        * Creates a generic Kafka JSON {@link StreamTableSource}.
-        *
-        * @param topic      Kafka topic to consume.
-        * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
-        */
-       KafkaJsonTableSource(
-                       String topic,
-                       Properties properties,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               super(topic, properties, 
createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
-       }
-
-       /**
-        * Creates a generic Kafka JSON {@link StreamTableSource}.
-        *
-        * @param topic      Kafka topic to consume.
-        * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
-        */
-       KafkaJsonTableSource(
-                       String topic,
-                       Properties properties,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
-
-               super(topic, properties, 
createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
-       }
-
-       /**
-        * Configures the failure behaviour if a JSON field is missing.
-        *
-        * <p>By default, a missing field is ignored and the field is set to 
null.
-        *
-        * @param failOnMissingField Flag indicating whether to fail or not on 
a missing field.
-        */
-       public void setFailOnMissingField(boolean failOnMissingField) {
-               JsonRowDeserializationSchema deserializationSchema = 
(JsonRowDeserializationSchema) getDeserializationSchema();
-               deserializationSchema.setFailOnMissingField(failOnMissingField);
-       }
-
-       private static JsonRowDeserializationSchema createDeserializationSchema(
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
-
-               return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
-       }
-
-       private static JsonRowDeserializationSchema createDeserializationSchema(
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
deleted file mode 100644
index 714d9cd..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sinks.StreamTableSink;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Properties;
-
-/**
- * A version-agnostic Kafka {@link StreamTableSink}.
- *
- * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, 
SerializationSchema, KafkaPartitioner)}}.
- */
-public abstract class KafkaTableSink implements StreamTableSink<Row> {
-
-       protected final String topic;
-       protected final Properties properties;
-       protected SerializationSchema<Row> serializationSchema;
-       protected final KafkaPartitioner<Row> partitioner;
-       protected String[] fieldNames;
-       protected TypeInformation[] fieldTypes;
-
-       /**
-        * Creates KafkaTableSink
-        *
-        * @param topic                 Kafka topic to write to.
-        * @param properties            Properties for the Kafka consumer.
-        * @param partitioner           Partitioner to select Kafka partition 
for each item
-        */
-       public KafkaTableSink(
-                       String topic,
-                       Properties properties,
-                       KafkaPartitioner<Row> partitioner) {
-
-               this.topic = Preconditions.checkNotNull(topic, "topic");
-               this.properties = Preconditions.checkNotNull(properties, 
"properties");
-               this.partitioner = Preconditions.checkNotNull(partitioner, 
"partitioner");
-       }
-
-       /**
-        * Returns the version-specifid Kafka producer.
-        *
-        * @param topic               Kafka topic to produce to.
-        * @param properties          Properties for the Kafka producer.
-        * @param serializationSchema Serialization schema to use to create 
Kafka records.
-        * @param partitioner         Partitioner to select Kafka partition.
-        * @return The version-specific Kafka producer
-        */
-       protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
-               String topic, Properties properties,
-               SerializationSchema<Row> serializationSchema,
-               KafkaPartitioner<Row> partitioner);
-
-       /**
-        * Create serialization schema for converting table rows into bytes.
-        *
-        * @param fieldNames Field names in table rows.
-        * @return Instance of serialization schema
-        */
-       protected abstract SerializationSchema<Row> 
createSerializationSchema(String[] fieldNames);
-
-       /**
-        * Create a deep copy of this sink.
-        *
-        * @return Deep copy of this sink
-        */
-       protected abstract KafkaTableSink createCopy();
-
-       @Override
-       public void emitDataStream(DataStream<Row> dataStream) {
-               FlinkKafkaProducerBase<Row> kafkaProducer = 
createKafkaProducer(topic, properties, serializationSchema, partitioner);
-               dataStream.addSink(kafkaProducer);
-       }
-
-       @Override
-       public TypeInformation<Row> getOutputType() {
-               return new RowTypeInfo(getFieldTypes());
-       }
-
-       public String[] getFieldNames() {
-               return fieldNames;
-       }
-
-       @Override
-       public TypeInformation<?>[] getFieldTypes() {
-               return fieldTypes;
-       }
-
-       @Override
-       public KafkaTableSink configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
-               KafkaTableSink copy = createCopy();
-               copy.fieldNames = Preconditions.checkNotNull(fieldNames, 
"fieldNames");
-               copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, 
"fieldTypes");
-               Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
-                       "Number of provided field names and types does not 
match.");
-               copy.serializationSchema = 
createSerializationSchema(fieldNames);
-
-               return copy;
-       }
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
deleted file mode 100644
index fd423d7..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Properties;
-
-import static 
org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo;
-
-/**
- * A version-agnostic Kafka {@link StreamTableSource}.
- *
- * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
- */
-public abstract class KafkaTableSource implements StreamTableSource<Row> {
-
-       /** The Kafka topic to consume. */
-       private final String topic;
-
-       /** Properties for the Kafka consumer. */
-       private final Properties properties;
-
-       /** Deserialization schema to use for Kafka records. */
-       private final DeserializationSchema<Row> deserializationSchema;
-
-       /** Row field names. */
-       private final String[] fieldNames;
-
-       /** Row field types. */
-       private final TypeInformation<?>[] fieldTypes;
-
-       /**
-        * Creates a generic Kafka {@link StreamTableSource}.
-        *
-        * @param topic                 Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
-        */
-       KafkaTableSource(
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               this(topic, properties, deserializationSchema, fieldNames, 
toTypeInfo(fieldTypes));
-       }
-
-       /**
-        * Creates a generic Kafka {@link StreamTableSource}.
-        *
-        * @param topic                 Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
-        */
-       KafkaTableSource(
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
-
-               this.topic = Preconditions.checkNotNull(topic, "Topic");
-               this.properties = Preconditions.checkNotNull(properties, 
"Properties");
-               this.deserializationSchema = 
Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
-               this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field 
names");
-               this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field 
types");
-
-               Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
-                               "Number of provided field names and types does 
not match.");
-       }
-
-       /**
-        * NOTE: This method is for internal use only for defining a 
TableSource.
-        *       Do not use it in Table API programs.
-        */
-       @Override
-       public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
-               // Version-specific Kafka consumer
-               FlinkKafkaConsumerBase<Row> kafkaConsumer = 
getKafkaConsumer(topic, properties, deserializationSchema);
-               DataStream<Row> kafkaSource = env.addSource(kafkaConsumer);
-               return kafkaSource;
-       }
-
-       @Override
-       public int getNumberOfFields() {
-               return fieldNames.length;
-       }
-
-       @Override
-       public String[] getFieldsNames() {
-               return fieldNames;
-       }
-
-       @Override
-       public TypeInformation<?>[] getFieldTypes() {
-               return fieldTypes;
-       }
-
-       @Override
-       public TypeInformation<Row> getReturnType() {
-               return new RowTypeInfo(fieldTypes);
-       }
-
-       /**
-        * Returns the version-specific Kafka consumer.
-        *
-        * @param topic                 Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @return The version-specific Kafka consumer
-        */
-       abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema);
-
-       /**
-        * Returns the deserialization schema.
-        *
-        * @return The deserialization schema
-        */
-       protected DeserializationSchema<Row> getDeserializationSchema() {
-               return deserializationSchema;
-       }
-}

Reply via email to