http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
deleted file mode 100644
index c28799c..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase {
-
-       @Test(timeout=60000)
-       public void testAutoOffsetReset() throws Exception {
-               runAutoOffsetResetTest();
-       }
-
-       @Test(timeout=60000)
-       public void testAutoOffsetResetNone() throws Exception {
-               runFailOnAutoOffsetResetNoneEager();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
deleted file mode 100644
index 6235449..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.common.KafkaException;
-import kafka.network.SocketServer;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
-import 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.Seq;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.BindException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * An implementation of the KafkaServerProvider for Kafka 0.8
- */
-public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
-
-       protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
-       private File tmpZkDir;
-       private File tmpKafkaParent;
-       private List<File> tmpKafkaDirs;
-       private List<KafkaServer> brokers;
-       private TestingServer zookeeper;
-       private String zookeeperConnectionString;
-       private String brokerConnectionString = "";
-       private Properties standardProps;
-       private Properties additionalServerProperties;
-
-       public String getBrokerConnectionString() {
-               return brokerConnectionString;
-       }
-
-       @Override
-       public Properties getStandardProperties() {
-               return standardProps;
-       }
-
-       @Override
-       public Properties getSecureProperties() {
-               return null;
-       }
-
-       @Override
-       public String getVersion() {
-               return "0.8";
-       }
-
-       @Override
-       public List<KafkaServer> getBrokers() {
-               return brokers;
-       }
-
-       @Override
-       public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, 
KeyedDeserializationSchema<T> readSchema, Properties props) {
-               return new FlinkKafkaConsumer08<>(topics, readSchema, props);
-       }
-
-       @Override
-       public <T> StreamSink<T> getProducerSink(
-                       String topic,
-                       KeyedSerializationSchema<T> serSchema,
-                       Properties props,
-                       KafkaPartitioner<T> partitioner) {
-               FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(
-                               topic,
-                               serSchema,
-                               props,
-                               partitioner);
-               prod.setFlushOnCheckpoint(true);
-               return new StreamSink<>(prod);
-       }
-
-       @Override
-       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
KafkaPartitioner<T> partitioner) {
-               FlinkKafkaProducer08<T> prod = new 
FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
-               prod.setFlushOnCheckpoint(true);
-               return stream.addSink(prod);
-       }
-
-       @Override
-       public KafkaOffsetHandler createOffsetHandler(Properties props) {
-               return new KafkaOffsetHandlerImpl(props);
-       }
-
-       @Override
-       public void restartBroker(int leaderId) throws Exception {
-               brokers.set(leaderId, getKafkaServer(leaderId, 
tmpKafkaDirs.get(leaderId)));
-       }
-
-       @Override
-       public int getLeaderToShutDown(String topic) throws Exception {
-               ZkClient zkClient = createZkClient();
-               PartitionMetadata firstPart = null;
-               do {
-                       if (firstPart != null) {
-                               LOG.info("Unable to find leader. error code 
{}", firstPart.errorCode());
-                               // not the first try. Sleep a bit
-                               Thread.sleep(150);
-                       }
-
-                       Seq<PartitionMetadata> partitionMetadata = 
AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
-                       firstPart = partitionMetadata.head();
-               }
-               while (firstPart.errorCode() != 0);
-               zkClient.close();
-
-               return firstPart.leader().get().id();
-       }
-
-       @Override
-       public int getBrokerId(KafkaServer server) {
-               return server.socketServer().brokerId();
-       }
-
-       @Override
-       public boolean isSecureRunSupported() {
-               return false;
-       }
-
-
-       @Override
-       public void prepare(int numKafkaServers, Properties 
additionalServerProperties, boolean secureMode) {
-               this.additionalServerProperties = additionalServerProperties;
-               File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
-               tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
-               try {
-                       Files.createDirectories(tmpZkDir.toPath());
-               } catch (IOException e) {
-                       fail("cannot create zookeeper temp dir: " + 
e.getMessage());
-               }
-
-               tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir" + 
(UUID.randomUUID().toString()));
-               try {
-                       Files.createDirectories(tmpKafkaParent.toPath());
-               } catch (IOException e) {
-                       fail("cannot create kafka temp dir: " + e.getMessage());
-               }
-
-               tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-               for (int i = 0; i < numKafkaServers; i++) {
-                       File tmpDir = new File(tmpKafkaParent, "server-" + i);
-                       assertTrue("cannot create kafka temp dir", 
tmpDir.mkdir());
-                       tmpKafkaDirs.add(tmpDir);
-               }
-
-               zookeeper = null;
-               brokers = null;
-
-               try {
-                       LOG.info("Starting Zookeeper");
-                       zookeeper = new TestingServer(-1, tmpZkDir);
-                       zookeeperConnectionString = 
zookeeper.getConnectString();
-
-                       LOG.info("Starting KafkaServer");
-                       brokers = new ArrayList<>(numKafkaServers);
-
-                       for (int i = 0; i < numKafkaServers; i++) {
-                               brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i)));
-                               SocketServer socketServer = 
brokers.get(i).socketServer();
-
-                               String host = socketServer.host() == null ? 
"localhost" : socketServer.host();
-                               brokerConnectionString += 
hostAndPortToUrlString(host, socketServer.port()) + ",";
-                       }
-
-                       LOG.info("ZK and KafkaServer started.");
-               }
-               catch (Throwable t) {
-                       t.printStackTrace();
-                       fail("Test setup failed: " + t.getMessage());
-               }
-
-               standardProps = new Properties();
-               standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
-               standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
-               standardProps.setProperty("group.id", "flink-tests");
-               standardProps.setProperty("auto.commit.enable", "false");
-               standardProps.setProperty("zookeeper.session.timeout.ms", 
"30000"); // 6 seconds is default. Seems to be too small for travis.
-               standardProps.setProperty("zookeeper.connection.timeout.ms", 
"30000");
-               standardProps.setProperty("auto.offset.reset", "smallest"); // 
read from the beginning. (smallest is kafka 0.8)
-               standardProps.setProperty("fetch.message.max.bytes", "256"); // 
make a lot of fetches (MESSAGES MUST BE SMALLER!)
-       }
-
-       @Override
-       public void shutdown() {
-               if (brokers != null) {
-                       for (KafkaServer broker : brokers) {
-                               if (broker != null) {
-                                       broker.shutdown();
-                               }
-                       }
-                       brokers.clear();
-               }
-
-               if (zookeeper != null) {
-                       try {
-                               zookeeper.stop();
-                               zookeeper.close();
-                       }
-                       catch (Exception e) {
-                               LOG.warn("ZK.stop() failed", e);
-                       }
-                       zookeeper = null;
-               }
-
-               // clean up the temp spaces
-
-               if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
-                       try {
-                               FileUtils.deleteDirectory(tmpKafkaParent);
-                       }
-                       catch (Exception e) {
-                               // ignore
-                       }
-               }
-               if (tmpZkDir != null && tmpZkDir.exists()) {
-                       try {
-                               FileUtils.deleteDirectory(tmpZkDir);
-                       }
-                       catch (Exception e) {
-                               // ignore
-                       }
-               }
-       }
-
-       @Override
-       public void createTestTopic(String topic, int numberOfPartitions, int 
replicationFactor, Properties topicConfig) {
-               // create topic with one client
-               LOG.info("Creating topic {}", topic);
-
-               ZkClient creator = createZkClient();
-
-               AdminUtils.createTopic(creator, topic, numberOfPartitions, 
replicationFactor, topicConfig);
-               creator.close();
-
-               // validate that the topic has been created
-               final long deadline = System.currentTimeMillis() + 30000;
-               do {
-                       try {
-                               Thread.sleep(100);
-                       }
-                       catch (InterruptedException e) {
-                               // restore interrupted state
-                       }
-                       List<KafkaTopicPartitionLeader> partitions = 
FlinkKafkaConsumer08.getPartitionsForTopic(Collections.singletonList(topic), 
standardProps);
-                       if (partitions != null && partitions.size() > 0) {
-                               return;
-                       }
-               }
-               while (System.currentTimeMillis() < deadline);
-               fail ("Test topic could not be created");
-       }
-
-       @Override
-       public void deleteTestTopic(String topic) {
-               LOG.info("Deleting topic {}", topic);
-
-               ZkClient zk = createZkClient();
-               AdminUtils.deleteTopic(zk, topic);
-               zk.close();
-       }
-
-       private ZkClient createZkClient() {
-               return new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
-                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
-       }
-
-       /**
-        * Only for the 0.8 server we need access to the zk client.
-        */
-       public CuratorFramework createCuratorClient() {
-               RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
-               CuratorFramework curatorClient = 
CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"),
 retryPolicy);
-               curatorClient.start();
-               return curatorClient;
-       }
-
-       /**
-        * Copied from 
com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
-        */
-       protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) 
throws Exception {
-               LOG.info("Starting broker with id {}", brokerId);
-               Properties kafkaProperties = new Properties();
-
-               // properties have to be Strings
-               kafkaProperties.put("advertised.host.name", KAFKA_HOST);
-               kafkaProperties.put("broker.id", Integer.toString(brokerId));
-               kafkaProperties.put("log.dir", tmpFolder.toString());
-               kafkaProperties.put("zookeeper.connect", 
zookeeperConnectionString);
-               kafkaProperties.put("message.max.bytes", String.valueOf(50 * 
1024 * 1024));
-               kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
-
-               // for CI stability, increase zookeeper session timeout
-               kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
-               kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
-               if(additionalServerProperties != null) {
-                       kafkaProperties.putAll(additionalServerProperties);
-               }
-
-               final int numTries = 5;
-
-               for (int i = 1; i <= numTries; i++) {
-                       int kafkaPort = NetUtils.getAvailablePort();
-                       kafkaProperties.put("port", 
Integer.toString(kafkaPort));
-                       KafkaConfig kafkaConfig = new 
KafkaConfig(kafkaProperties);
-
-                       try {
-                               KafkaServer server = new 
KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
-                               server.startup();
-                               return server;
-                       }
-                       catch (KafkaException e) {
-                               if (e.getCause() instanceof BindException) {
-                                       // port conflict, retry...
-                                       LOG.info("Port conflict when starting 
Kafka Broker. Retrying...");
-                               }
-                               else {
-                                       throw e;
-                               }
-                       }
-               }
-
-               throw new Exception("Could not start Kafka after " + numTries + 
" retries due to port conflicts.");
-       }
-
-       private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
-
-               private final CuratorFramework offsetClient;
-               private final String groupId;
-
-               public KafkaOffsetHandlerImpl(Properties props) {
-                       offsetClient = createCuratorClient();
-                       groupId = props.getProperty("group.id");
-               }
-
-               @Override
-               public Long getCommittedOffset(String topicName, int partition) 
{
-                       try {
-                               return 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(offsetClient, groupId, topicName, 
partition);
-                       } catch (Exception e) {
-                               throw new RuntimeException("Exception when 
getting offsets from Zookeeper", e);
-                       }
-               }
-
-               @Override
-               public void close() {
-                       offsetClient.close();
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
deleted file mode 100644
index 6298c92..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
+++ /dev/null
@@ -1,603 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.*;
-import static java.util.Arrays.asList;
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-
-public class ClosableBlockingQueueTest {
-
-       // 
------------------------------------------------------------------------
-       //  single-threaded unit tests
-       // 
------------------------------------------------------------------------
-       
-       @Test
-       public void testCreateQueueHashCodeEquals() {
-               try {
-                       ClosableBlockingQueue<String> queue1 = new 
ClosableBlockingQueue<>();
-                       ClosableBlockingQueue<String> queue2 = new 
ClosableBlockingQueue<>(22);
-
-                       assertTrue(queue1.isOpen());
-                       assertTrue(queue2.isOpen());
-                       assertTrue(queue1.isEmpty());
-                       assertTrue(queue2.isEmpty());
-                       assertEquals(0, queue1.size());
-                       assertEquals(0, queue2.size());
-                       
-                       assertTrue(queue1.hashCode() == queue2.hashCode());
-                       //noinspection EqualsWithItself
-                       assertTrue(queue1.equals(queue1));
-                       //noinspection EqualsWithItself
-                       assertTrue(queue2.equals(queue2));
-                       assertTrue(queue1.equals(queue2));
-                       
-                       assertNotNull(queue1.toString());
-                       assertNotNull(queue2.toString());
-
-                       List<String> elements = new ArrayList<>();
-                       elements.add("a");
-                       elements.add("b");
-                       elements.add("c");
-
-                       ClosableBlockingQueue<String> queue3 = new 
ClosableBlockingQueue<>(elements);
-                       ClosableBlockingQueue<String> queue4 = new 
ClosableBlockingQueue<>(asList("a", "b", "c"));
-
-                       assertTrue(queue3.isOpen());
-                       assertTrue(queue4.isOpen());
-                       assertFalse(queue3.isEmpty());
-                       assertFalse(queue4.isEmpty());
-                       assertEquals(3, queue3.size());
-                       assertEquals(3, queue4.size());
-
-                       assertTrue(queue3.hashCode() == queue4.hashCode());
-                       //noinspection EqualsWithItself
-                       assertTrue(queue3.equals(queue3));
-                       //noinspection EqualsWithItself
-                       assertTrue(queue4.equals(queue4));
-                       assertTrue(queue3.equals(queue4));
-                       
-                       assertNotNull(queue3.toString());
-                       assertNotNull(queue4.toString());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCloseEmptyQueue() {
-               try {
-                       ClosableBlockingQueue<String> queue = new 
ClosableBlockingQueue<>();
-                       assertTrue(queue.isOpen());
-                       assertTrue(queue.close());
-                       assertFalse(queue.isOpen());
-                       
-                       assertFalse(queue.addIfOpen("element"));
-                       assertTrue(queue.isEmpty());
-                       
-                       try {
-                               queue.add("some element");
-                               fail("should cause an exception");
-                       } catch (IllegalStateException ignored) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testCloseNonEmptyQueue() {
-               try {
-                       ClosableBlockingQueue<Integer> queue = new 
ClosableBlockingQueue<>(asList(1, 2, 3));
-                       assertTrue(queue.isOpen());
-                       
-                       assertFalse(queue.close());
-                       assertFalse(queue.close());
-                       
-                       queue.poll();
-
-                       assertFalse(queue.close());
-                       assertFalse(queue.close());
-                       
-                       queue.pollBatch();
-
-                       assertTrue(queue.close());
-                       assertFalse(queue.isOpen());
-
-                       assertFalse(queue.addIfOpen(42));
-                       assertTrue(queue.isEmpty());
-
-                       try {
-                               queue.add(99);
-                               fail("should cause an exception");
-                       } catch (IllegalStateException ignored) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testPeekAndPoll() {
-               try {
-                       ClosableBlockingQueue<String> queue = new 
ClosableBlockingQueue<>();
-                       
-                       assertNull(queue.peek());
-                       assertNull(queue.peek());
-                       assertNull(queue.poll());
-                       assertNull(queue.poll());
-                       
-                       assertEquals(0, queue.size());
-                       
-                       queue.add("a");
-                       queue.add("b");
-                       queue.add("c");
-
-                       assertEquals(3, queue.size());
-                       
-                       assertEquals("a", queue.peek());
-                       assertEquals("a", queue.peek());
-                       assertEquals("a", queue.peek());
-
-                       assertEquals(3, queue.size());
-                       
-                       assertEquals("a", queue.poll());
-                       assertEquals("b", queue.poll());
-
-                       assertEquals(1, queue.size());
-                       
-                       assertEquals("c", queue.peek());
-                       assertEquals("c", queue.peek());
-
-                       assertEquals("c", queue.poll());
-
-                       assertEquals(0, queue.size());
-                       assertNull(queue.poll());
-                       assertNull(queue.peek());
-                       assertNull(queue.peek());
-                       
-                       assertTrue(queue.close());
-                       
-                       try {
-                               queue.peek();
-                               fail("should cause an exception");
-                       } catch (IllegalStateException ignored) {
-                               // expected
-                       }
-
-                       try {
-                               queue.poll();
-                               fail("should cause an exception");
-                       } catch (IllegalStateException ignored) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testPollBatch() {
-               try {
-                       ClosableBlockingQueue<String> queue = new 
ClosableBlockingQueue<>();
-
-                       assertNull(queue.pollBatch());
-                       
-                       queue.add("a");
-                       queue.add("b");
-                       
-                       assertEquals(asList("a", "b"), queue.pollBatch());
-                       assertNull(queue.pollBatch());
-                       
-                       queue.add("c");
-
-                       assertEquals(singletonList("c"), queue.pollBatch());
-                       assertNull(queue.pollBatch());
-
-                       assertTrue(queue.close());
-
-                       try {
-                               queue.pollBatch();
-                               fail("should cause an exception");
-                       } catch (IllegalStateException ignored) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testGetElementBlocking() {
-               try {
-                       ClosableBlockingQueue<String> queue = new 
ClosableBlockingQueue<>();
-
-                       assertNull(queue.getElementBlocking(1));
-                       assertNull(queue.getElementBlocking(3));
-                       assertNull(queue.getElementBlocking(2));
-
-                       assertEquals(0, queue.size());
-
-                       queue.add("a");
-                       queue.add("b");
-                       queue.add("c");
-                       queue.add("d");
-                       queue.add("e");
-                       queue.add("f");
-
-                       assertEquals(6, queue.size());
-
-                       assertEquals("a", queue.getElementBlocking(99));
-                       assertEquals("b", queue.getElementBlocking());
-
-                       assertEquals(4, queue.size());
-
-                       assertEquals("c", queue.getElementBlocking(0));
-                       assertEquals("d", queue.getElementBlocking(1000000));
-                       assertEquals("e", queue.getElementBlocking());
-                       assertEquals("f", queue.getElementBlocking(1786598));
-
-                       assertEquals(0, queue.size());
-
-                       assertNull(queue.getElementBlocking(1));
-                       assertNull(queue.getElementBlocking(3));
-                       assertNull(queue.getElementBlocking(2));
-
-                       assertTrue(queue.close());
-
-                       try {
-                               queue.getElementBlocking();
-                               fail("should cause an exception");
-                       } catch (IllegalStateException ignored) {
-                               // expected
-                       }
-
-                       try {
-                               queue.getElementBlocking(1000000000L);
-                               fail("should cause an exception");
-                       } catch (IllegalStateException ignored) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testGetBatchBlocking() {
-               try {
-                       ClosableBlockingQueue<String> queue = new 
ClosableBlockingQueue<>();
-
-                       assertEquals(emptyList(), queue.getBatchBlocking(1));
-                       assertEquals(emptyList(), queue.getBatchBlocking(3));
-                       assertEquals(emptyList(), queue.getBatchBlocking(2));
-
-                       queue.add("a");
-                       queue.add("b");
-
-                       assertEquals(asList("a", "b"), 
queue.getBatchBlocking(900000009));
-
-                       queue.add("c");
-                       queue.add("d");
-
-                       assertEquals(asList("c", "d"), 
queue.getBatchBlocking());
-
-                       assertEquals(emptyList(), queue.getBatchBlocking(2));
-
-                       queue.add("e");
-
-                       assertEquals(singletonList("e"), 
queue.getBatchBlocking(0));
-
-                       queue.add("f");
-
-                       assertEquals(singletonList("f"), 
queue.getBatchBlocking(1000000000));
-
-                       assertEquals(0, queue.size());
-
-                       assertEquals(emptyList(), queue.getBatchBlocking(1));
-                       assertEquals(emptyList(), queue.getBatchBlocking(3));
-                       assertEquals(emptyList(), queue.getBatchBlocking(2));
-
-                       assertTrue(queue.close());
-
-                       try {
-                               queue.getBatchBlocking();
-                               fail("should cause an exception");
-                       } catch (IllegalStateException ignored) {
-                               // expected
-                       }
-
-                       try {
-                               queue.getBatchBlocking(1000000000L);
-                               fail("should cause an exception");
-                       } catch (IllegalStateException ignored) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  multi-threaded tests
-       // 
------------------------------------------------------------------------
-       
-       @Test
-       public void notifyOnClose() {
-               try {
-                       final long oneYear = 365L * 24 * 60 * 60 * 1000;
-                       
-                       // test "getBatchBlocking()"
-                       final ClosableBlockingQueue<String> queue1 = new 
ClosableBlockingQueue<>();
-                       QueueCall call1 = new QueueCall() {
-                               @Override
-                               public void call() throws Exception {
-                                       queue1.getBatchBlocking();
-                               }
-                       };
-                       testCallExitsOnClose(call1, queue1);
-
-                       // test "getBatchBlocking()"
-                       final ClosableBlockingQueue<String> queue2 = new 
ClosableBlockingQueue<>();
-                       QueueCall call2 = new QueueCall() {
-                               @Override
-                               public void call() throws Exception {
-                                       queue2.getBatchBlocking(oneYear);
-                               }
-                       };
-                       testCallExitsOnClose(call2, queue2);
-
-                       // test "getBatchBlocking()"
-                       final ClosableBlockingQueue<String> queue3 = new 
ClosableBlockingQueue<>();
-                       QueueCall call3 = new QueueCall() {
-                               @Override
-                               public void call() throws Exception {
-                                       queue3.getElementBlocking();
-                               }
-                       };
-                       testCallExitsOnClose(call3, queue3);
-
-                       // test "getBatchBlocking()"
-                       final ClosableBlockingQueue<String> queue4 = new 
ClosableBlockingQueue<>();
-                       QueueCall call4 = new QueueCall() {
-                               @Override
-                               public void call() throws Exception {
-                                       queue4.getElementBlocking(oneYear);
-                               }
-                       };
-                       testCallExitsOnClose(call4, queue4);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-       @Test
-       public void testMultiThreadedAddGet() {
-               try {
-                       final ClosableBlockingQueue<Integer> queue = new 
ClosableBlockingQueue<>();
-                       final AtomicReference<Throwable> pushErrorRef = new 
AtomicReference<>();
-                       final AtomicReference<Throwable> pollErrorRef = new 
AtomicReference<>();
-                       
-                       final int numElements = 2000;
-                       
-                       Thread pusher = new Thread("pusher") {
-
-                               @Override
-                               public void run() {
-                                       try {
-                                               final Random rnd = new Random();
-                                               for (int i = 0; i < 
numElements; i++) {
-                                                       queue.add(i);
-                                                       
-                                                       // sleep a bit, 
sometimes
-                                                       int sleepTime = 
rnd.nextInt(3);
-                                                       if (sleepTime > 1) {
-                                                               
Thread.sleep(sleepTime);
-                                                       }
-                                               }
-                                               
-                                               while (true) {
-                                                       if (queue.close()) {
-                                                               break;
-                                                       } else {
-                                                               Thread.sleep(5);
-                                                       }
-                                               }
-                                       } catch (Throwable t) {
-                                               pushErrorRef.set(t);
-                                       }
-                               }
-                       };
-                       pusher.start();
-
-                       Thread poller = new Thread("poller") {
-
-                               @SuppressWarnings("InfiniteLoopStatement")
-                               @Override
-                               public void run() {
-                                       try {
-                                               int count = 0;
-                                               
-                                               try {
-                                                       final Random rnd = new 
Random();
-                                                       int nextExpected = 0;
-                                                       
-                                                       while (true) {
-                                                               int getMethod = 
count % 7;
-                                                               switch 
(getMethod) {
-                                                                       case 0: 
{
-                                                                               
Integer next = queue.getElementBlocking(1);
-                                                                               
if (next != null) {
-                                                                               
        assertEquals(nextExpected, next.intValue());
-                                                                               
        nextExpected++;
-                                                                               
        count++;
-                                                                               
}
-                                                                               
break;
-                                                                       }
-                                                                       case 1: 
{
-                                                                               
List<Integer> nextList = queue.getBatchBlocking();
-                                                                               
for (Integer next : nextList) {
-                                                                               
        assertNotNull(next);
-                                                                               
        assertEquals(nextExpected, next.intValue());
-                                                                               
        nextExpected++;
-                                                                               
        count++;
-                                                                               
}
-                                                                               
break;
-                                                                       }
-                                                                       case 2: 
{
-                                                                               
List<Integer> nextList = queue.getBatchBlocking(1);
-                                                                               
if (nextList != null) {
-                                                                               
        for (Integer next : nextList) {
-                                                                               
                assertNotNull(next);
-                                                                               
                assertEquals(nextExpected, next.intValue());
-                                                                               
                nextExpected++;
-                                                                               
                count++;
-                                                                               
        }
-                                                                               
}
-                                                                               
break;
-                                                                       }
-                                                                       case 3: 
{
-                                                                               
Integer next = queue.poll();
-                                                                               
if (next != null) {
-                                                                               
        assertEquals(nextExpected, next.intValue());
-                                                                               
        nextExpected++;
-                                                                               
        count++;
-                                                                               
}
-                                                                               
break;
-                                                                       }
-                                                                       case 4: 
{
-                                                                               
List<Integer> nextList = queue.pollBatch();
-                                                                               
if (nextList != null) {
-                                                                               
        for (Integer next : nextList) {
-                                                                               
                assertNotNull(next);
-                                                                               
                assertEquals(nextExpected, next.intValue());
-                                                                               
                nextExpected++;
-                                                                               
                count++;
-                                                                               
        }
-                                                                               
}
-                                                                               
break;
-                                                                       }
-                                                                       
default: {
-                                                                               
Integer next = queue.getElementBlocking();
-                                                                               
assertNotNull(next);
-                                                                               
assertEquals(nextExpected, next.intValue());
-                                                                               
nextExpected++;
-                                                                               
count++;
-                                                                       }
-                                                               }
-                                                               
-                                                               // sleep a bit, 
sometimes
-                                                               int sleepTime = 
rnd.nextInt(3);
-                                                               if (sleepTime > 
1) {
-                                                                       
Thread.sleep(sleepTime);
-                                                               }
-                                                       }
-                                               } catch (IllegalStateException 
e) {
-                                                       // we get this once the 
queue is closed
-                                                       
assertEquals(numElements, count);
-                                               }
-                                       } catch (Throwable t) {
-                                               pollErrorRef.set(t);
-                                       }
-                               }
-                       };
-                       poller.start();
-                       
-                       pusher.join();
-                       poller.join();
-                       
-                       if (pushErrorRef.get() != null) {
-                               Throwable t = pushErrorRef.get();
-                               t.printStackTrace();
-                               fail("Error in pusher: " + t.getMessage());
-                       }
-                       if (pollErrorRef.get() != null) {
-                               Throwable t = pollErrorRef.get();
-                               t.printStackTrace();
-                               fail("Error in poller: " + t.getMessage());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Utils
-       // 
------------------------------------------------------------------------
-       
-       private static void testCallExitsOnClose(
-                       final QueueCall call, ClosableBlockingQueue<String> 
queue) throws Exception {
-               
-               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
-               
-               Runnable runnable = new Runnable() {
-                       @Override
-                       public void run() {
-                               try {
-                                       call.call();
-                               } catch (Throwable t) {
-                                       errorRef.set(t);
-                               }
-                       }
-               };
-
-               Thread thread = new Thread(runnable);
-               thread.start();
-               Thread.sleep(100);
-               queue.close();
-               thread.join();
-
-               @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-               Throwable cause = errorRef.get();
-               assertTrue(cause instanceof IllegalStateException);
-       }
-       
-       private interface QueueCall {
-               void call() throws Exception;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
deleted file mode 100644
index fbeb110..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-log4j.logger.org.apache.zookeeper=OFF, testlogger
-log4j.logger.state.change.logger=OFF, testlogger
-log4j.logger.kafka=OFF, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
deleted file mode 100644
index f638c7a..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
+++ /dev/null
@@ -1,212 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-streaming-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
-       <name>flink-connector-kafka-0.9</name>
-
-       <packaging>jar</packaging>
-
-       <!-- Allow users to pass custom connector versions -->
-       <properties>
-               <kafka.version>0.9.0.1</kafka.version>
-       </properties>
-
-       <dependencies>
-
-               <!-- core dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-connector-kafka-base_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>org.apache.kafka</groupId>
-                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-                       <!-- Projects depending on this project,
-                       won't depend on flink-table. -->
-                       <optional>true</optional>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.kafka</groupId>
-                       <artifactId>kafka-clients</artifactId>
-                       <version>${kafka.version}</version>
-               </dependency>
-
-               <!-- test dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-connector-kafka-base_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <exclusions>
-                               <!-- exclude 0.8 dependencies -->
-                               <exclusion>
-                                       <groupId>org.apache.kafka</groupId>
-                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
-                               </exclusion>
-                       </exclusions>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <!-- include 0.9 server for tests  -->
-                       <groupId>org.apache.kafka</groupId>
-                       <artifactId>kafka_${scala.binary.version}</artifactId>
-                       <version>${kafka.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-metrics-jmx</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-tests_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-runtime_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-minikdc</artifactId>
-                       <version>${minikdc.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-jar-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <goals>
-                                                       <goal>test-jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <includes>
-                                                               
<include>**/KafkaTestEnvironmentImpl*</include>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-source-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <id>attach-test-sources</id>
-                                               <goals>
-                                                       
<goal>test-jar-no-fork</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <includes>
-                                                               
<include>**/KafkaTestEnvironmentImpl*</include>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-surefire-plugin</artifactId>
-                               <configuration>
-                                       <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
-                                       <forkCount>1</forkCount>
-                                       <argLine>-Xms256m -Xmx1000m 
-Dlog4j.configuration=${log4j.configuration} 
-Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
-                               </configuration>
-                       </plugin>
-                       <!--
-            https://issues.apache.org/jira/browse/DIRSHARED-134
-            Required to pull the Mini-KDC transitive dependency
-            -->
-                       <plugin>
-                               <groupId>org.apache.felix</groupId>
-                               <artifactId>maven-bundle-plugin</artifactId>
-                               <version>3.0.1</version>
-                               <inherited>true</inherited>
-                               <extensions>true</extensions>
-                       </plugin>
-               </plugins>
-       </build>
-       
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
deleted file mode 100644
index 29bb8e4..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.util.SerializedValue;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
- * Apache Kafka 0.9.x. The consumer can run in multiple parallel instances, 
each of which will pull
- * data from one or more Kafka partitions. 
- * 
- * <p>The Flink Kafka Consumer participates in checkpointing and guarantees 
that no data is lost
- * during a failure, and that the computation processes elements "exactly 
once". 
- * (Note: These guarantees naturally assume that Kafka itself does not loose 
any data.)</p>
- *
- * <p>Please note that Flink snapshots the offsets internally as part of its 
distributed checkpoints. The offsets
- * committed to Kafka / ZooKeeper are only to bring the outside view of 
progress in sync with Flink's view
- * of the progress. That way, monitoring and other jobs can get a view of how 
far the Flink Kafka consumer
- * has consumed a topic.</p>
- *
- * <p>Please refer to Kafka's documentation for the available configuration 
properties:
- * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
- *
- * <p><b>NOTE:</b> The implementation currently accesses partition metadata 
when the consumer
- * is constructed. That means that the client that submits the program needs 
to be able to
- * reach the Kafka brokers or ZooKeeper.</p>
- */
-public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
-
-       private static final long serialVersionUID = 2324564345203409112L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumer09.class);
-
-       /**  Configuration key to change the polling timeout **/
-       public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
-
-
-       /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in 
poll if data is not
-        * available. If 0, returns immediately with any records that are 
available now. */
-       public static final long DEFAULT_POLL_TIMEOUT = 100L;
-
-       // 
------------------------------------------------------------------------
-
-       /** User-supplied properties for Kafka **/
-       protected final Properties properties;
-
-       /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in 
poll if data is not
-        * available. If 0, returns immediately with any records that are 
available now */
-       protected final long pollTimeout;
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new Kafka streaming source consumer for Kafka 0.9.x
-        *
-        * @param topic
-        *           The name of the topic that should be consumed.
-        * @param valueDeserializer
-        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
-        * @param props
-        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
-        */
-       public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
-               this(Collections.singletonList(topic), valueDeserializer, 
props);
-       }
-
-       /**
-        * Creates a new Kafka streaming source consumer for Kafka 0.9.x
-        *
-        * This constructor allows passing a {@see KeyedDeserializationSchema} 
for reading key/value
-        * pairs, offsets, and topic names from Kafka.
-        *
-        * @param topic
-        *           The name of the topic that should be consumed.
-        * @param deserializer
-        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
-        * @param props
-        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
-        */
-       public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> 
deserializer, Properties props) {
-               this(Collections.singletonList(topic), deserializer, props);
-       }
-
-       /**
-        * Creates a new Kafka streaming source consumer for Kafka 0.9.x
-        *
-        * This constructor allows passing multiple topics to the consumer.
-        *
-        * @param topics
-        *           The Kafka topics to read from.
-        * @param deserializer
-        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
-        * @param props
-        *           The properties that are used to configure both the fetcher 
and the offset handler.
-        */
-       public FlinkKafkaConsumer09(List<String> topics, 
DeserializationSchema<T> deserializer, Properties props) {
-               this(topics, new 
KeyedDeserializationSchemaWrapper<>(deserializer), props);
-       }
-
-       /**
-        * Creates a new Kafka streaming source consumer for Kafka 0.9.x
-        *
-        * This constructor allows passing multiple topics and a key/value 
deserialization schema.
-        *
-        * @param topics
-        *           The Kafka topics to read from.
-        * @param deserializer
-        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
-        * @param props
-        *           The properties that are used to configure both the fetcher 
and the offset handler.
-        */
-       public FlinkKafkaConsumer09(List<String> topics, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
-               super(topics, deserializer);
-
-               this.properties = checkNotNull(props, "props");
-               setDeserializer(this.properties);
-
-               // configure the polling timeout
-               try {
-                       if (properties.containsKey(KEY_POLL_TIMEOUT)) {
-                               this.pollTimeout = 
Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
-                       } else {
-                               this.pollTimeout = DEFAULT_POLL_TIMEOUT;
-                       }
-               }
-               catch (Exception e) {
-                       throw new IllegalArgumentException("Cannot parse poll 
timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
-               }
-       }
-
-       @Override
-       protected AbstractFetcher<T, ?> createFetcher(
-                       SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> thisSubtaskPartitions,
-                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
-                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
-                       StreamingRuntimeContext runtimeContext) throws 
Exception {
-
-               boolean useMetrics = 
!Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
-
-               return new Kafka09Fetcher<>(
-                               sourceContext,
-                               thisSubtaskPartitions,
-                               watermarksPeriodic,
-                               watermarksPunctuated,
-                               runtimeContext.getProcessingTimeService(),
-                               
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
-                               runtimeContext.getUserCodeClassLoader(),
-                               runtimeContext.isCheckpointingEnabled(),
-                               runtimeContext.getTaskNameWithSubtasks(),
-                               runtimeContext.getMetricGroup(),
-                               deserializer,
-                               properties,
-                               pollTimeout,
-                               useMetrics);
-               
-       }
-
-       @Override
-       protected List<KafkaTopicPartition> getKafkaPartitions(List<String> 
topics) {
-               // read the partitions that belong to the listed topics
-               final List<KafkaTopicPartition> partitions = new ArrayList<>();
-
-               try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(this.properties)) {
-                       for (final String topic: topics) {
-                               // get partitions for each topic
-                               List<PartitionInfo> partitionsForTopic = 
consumer.partitionsFor(topic);
-                               // for non existing topics, the list might be 
null.
-                               if (partitionsForTopic != null) {
-                                       
partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
-                               }
-                       }
-               }
-
-               if (partitions.isEmpty()) {
-                       throw new RuntimeException("Unable to retrieve any 
partitions for the requested topics " + topics);
-               }
-
-               // we now have a list of partitions which is the same for all 
parallel consumer instances.
-               LOG.info("Got {} partitions from these topics: {}", 
partitions.size(), topics);
-
-               if (LOG.isInfoEnabled()) {
-                       logPartitionInfo(LOG, partitions);
-               }
-
-               return partitions;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities 
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Converts a list of Kafka PartitionInfo's to Flink's 
KafkaTopicPartition (which are serializable)
-        * 
-        * @param partitions A list of Kafka PartitionInfos.
-        * @return A list of KafkaTopicPartitions
-        */
-       private static List<KafkaTopicPartition> 
convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
-               checkNotNull(partitions);
-
-               List<KafkaTopicPartition> ret = new 
ArrayList<>(partitions.size());
-               for (PartitionInfo pi : partitions) {
-                       ret.add(new KafkaTopicPartition(pi.topic(), 
pi.partition()));
-               }
-               return ret;
-       }
-
-       /**
-        * Makes sure that the ByteArrayDeserializer is registered in the Kafka 
properties.
-        * 
-        * @param props The Kafka properties to register the serializer in.
-        */
-       private static void setDeserializer(Properties props) {
-               final String deSerName = 
ByteArrayDeserializer.class.getCanonicalName();
-
-               Object keyDeSer = 
props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-               Object valDeSer = 
props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-
-               if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
-                       LOG.warn("Ignoring configured key DeSerializer ({})", 
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-               }
-               if (valDeSer != null && !valDeSer.equals(deSerName)) {
-                       LOG.warn("Ignoring configured value DeSerializer ({})", 
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-               }
-
-               props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
deSerName);
-               props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deSerName);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
deleted file mode 100644
index 2a3e39d..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-
-/**
- * Flink Sink to produce data into a Kafka topic. This producer is compatible 
with Kafka 0.9.
- *
- * Please note that this producer does not have any reliability guarantees.
- *
- * @param <IN> Type of the messages to write into Kafka.
- */
-public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
-
-       private static final long serialVersionUID = 1L;
-
-       // ------------------- Keyless serialization schema constructors 
----------------------
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
-        *
-        * @param brokerList
-        *                      Comma separated addresses of the brokers
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
-        */
-       public FlinkKafkaProducer09(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
-        * the topic.
-        *
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
-        * @param producerConfig
-        *                      Properties with the producer configuration.
-        */
-       public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<IN>());
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
-        * the topic.
-        *
-        * @param topicId The topic to write data to
-        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
-        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
-        */
-       public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner<IN> 
customPartitioner) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
-
-       }
-
-       // ------------------- Key/Value serialization schema constructors 
----------------------
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
-        *
-        * @param brokerList
-        *                      Comma separated addresses of the brokers
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined serialization schema supporting 
key/value messages
-        */
-       public FlinkKafkaProducer09(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
-               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
-        *
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined serialization schema supporting 
key/value messages
-        * @param producerConfig
-        *                      Properties with the producer configuration.
-        */
-       public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-               this(topicId, serializationSchema, producerConfig, new 
FixedPartitioner<IN>());
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
-        *
-        * @param topicId The topic to write data to
-        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
-        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
-        */
-       public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
-               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
-       }
-
-       @Override
-       protected void flush() {
-               if (this.producer != null) {
-                       producer.flush();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
deleted file mode 100644
index 38ea47c..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-/**
- * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
- */
-public class Kafka09JsonTableSink extends KafkaJsonTableSink {
-       /**
-        * Creates {@link KafkaTableSink} for Kafka 0.9
-        *
-        * @param topic topic in Kafka to which table is written
-        * @param properties properties to connect to Kafka
-        * @param partitioner Kafka partitioner
-        */
-       public Kafka09JsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
-               super(topic, properties, partitioner);
-       }
-
-       @Override
-       protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
KafkaPartitioner<Row> partitioner) {
-               return new FlinkKafkaProducer09<>(topic, serializationSchema, 
properties, partitioner);
-       }
-
-       @Override
-       protected Kafka09JsonTableSink createCopy() {
-               return new Kafka09JsonTableSink(topic, properties, partitioner);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
deleted file mode 100644
index 975ef58..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.9.
- */
-public class Kafka09JsonTableSource extends KafkaJsonTableSource {
-
-       /**
-        * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
-        *
-        * @param topic      Kafka topic to consume.
-        * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
-        */
-       public Kafka09JsonTableSource(
-                       String topic,
-                       Properties properties,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
-
-               super(topic, properties, fieldNames, fieldTypes);
-       }
-
-       /**
-        * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
-        *
-        * @param topic      Kafka topic to consume.
-        * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
-        */
-       public Kafka09JsonTableSource(
-                       String topic,
-                       Properties properties,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               super(topic, properties, fieldNames, fieldTypes);
-       }
-
-       @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
-               return new FlinkKafkaConsumer09<>(topic, deserializationSchema, 
properties);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
deleted file mode 100644
index 03b5040..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.9.
- */
-public class Kafka09TableSource extends KafkaTableSource {
-
-       /**
-        * Creates a Kafka 0.9 {@link StreamTableSource}.
-        *
-        * @param topic                 Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
-        */
-       public Kafka09TableSource(
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
-
-               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
-       }
-
-       /**
-        * Creates a Kafka 0.9 {@link StreamTableSource}.
-        *
-        * @param topic                 Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
-        */
-       public Kafka09TableSource(
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
-       }
-
-       @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
-               return new FlinkKafkaConsumer09<>(topic, deserializationSchema, 
properties);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
deleted file mode 100644
index e6e3c51..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.ThreadSafe;
-import java.io.Closeable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The Handover is a utility to hand over data (a buffer of records) and 
exception from a
- * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves 
like a
- * "size one blocking queue", with some extras around exception reporting, 
closing, and
- * waking up thread without {@link Thread#interrupt() interrupting} threads.
- * 
- * <p>This class is used in the Flink Kafka Consumer to hand over data and 
exceptions between
- * the thread that runs the KafkaConsumer class and the main thread.
- * 
- * <p>The Handover has the notion of "waking up" the producer thread with a 
{@link WakeupException}
- * rather than a thread interrupt.
- * 
- * <p>The Handover can also be "closed", signalling from one thread to the 
other that it
- * the thread has terminated.
- */
-@ThreadSafe
-public final class Handover implements Closeable {
-
-       private final Object lock = new Object();
-
-       private ConsumerRecords<byte[], byte[]> next;
-       private Throwable error;
-       private boolean wakeupProducer;
-
-       /**
-        * Polls the next element from the Handover, possibly blocking until 
the next element is
-        * available. This method behaves similar to polling from a blocking 
queue.
-        * 
-        * <p>If an exception was handed in by the producer ({@link 
#reportError(Throwable)}), then
-        * that exception is thrown rather than an element being returned.
-        * 
-        * @return The next element (buffer of records, never null).
-        * 
-        * @throws ClosedException Thrown if the Handover was {@link #close() 
closed}.
-        * @throws Exception Rethrows exceptions from the {@link 
#reportError(Throwable)} method.
-        */
-       @Nonnull
-       public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
-               synchronized (lock) {
-                       while (next == null && error == null) {
-                               lock.wait();
-                       }
-
-                       ConsumerRecords<byte[], byte[]> n = next;
-                       if (n != null) {
-                               next = null;
-                               lock.notifyAll();
-                               return n;
-                       }
-                       else {
-                               ExceptionUtils.rethrowException(error, 
error.getMessage());
-
-                               // this statement cannot be reached since the 
above method always throws an exception
-                               // this is only here to silence the compiler 
and any warnings
-                               return ConsumerRecords.empty(); 
-                       }
-               }
-       }
-
-       /**
-        * Hands over an element from the producer. If the Handover already has 
an element that was
-        * not yet picked up by the consumer thread, this call blocks until the 
consumer picks up that
-        * previous element.
-        * 
-        * <p>This behavior is similar to a "size one" blocking queue.
-        * 
-        * @param element The next element to hand over.
-        * 
-        * @throws InterruptedException
-        *                 Thrown, if the thread is interrupted while blocking 
for the Handover to be empty.
-        * @throws WakeupException
-        *                 Thrown, if the {@link #wakeupProducer()} method is 
called while blocking for
-        *                 the Handover to be empty.
-        * @throws ClosedException
-        *                 Thrown if the Handover was closed or concurrently 
being closed.
-        */
-       public void produce(final ConsumerRecords<byte[], byte[]> element)
-                       throws InterruptedException, WakeupException, 
ClosedException {
-
-               checkNotNull(element);
-
-               synchronized (lock) {
-                       while (next != null && !wakeupProducer) {
-                               lock.wait();
-                       }
-
-                       wakeupProducer = false;
-
-                       // if there is still an element, we must have been 
woken up
-                       if (next != null) {
-                               throw new WakeupException();
-                       }
-                       // if there is no error, then this is open and can 
accept this element
-                       else if (error == null) {
-                               next = element;
-                               lock.notifyAll();
-                       }
-                       // an error marks this as closed for the producer
-                       else {
-                               throw new ClosedException();
-                       }
-               }
-       }
-
-       /**
-        * Reports an exception. The consumer will throw the given exception 
immediately, if
-        * it is currently blocked in the {@link #pollNext()} method, or the 
next time it
-        * calls that method.
-        * 
-        * <p>After this method has been called, no call to either {@link 
#produce(ConsumerRecords)}
-        * or {@link #pollNext()} will ever return regularly any more, but will 
always return
-        * exceptionally.
-        * 
-        * <p>If another exception was already reported, this method does 
nothing.
-        * 
-        * <p>For the producer, the Handover will appear as if it was {@link 
#close() closed}.
-        * 
-        * @param t The exception to report.
-        */
-       public void reportError(Throwable t) {
-               checkNotNull(t);
-
-               synchronized (lock) {
-                       // do not override the initial exception
-                       if (error == null) {
-                               error = t;
-                       }
-                       next = null;
-                       lock.notifyAll();
-               }
-       }
-
-       /**
-        * Closes the handover. Both the {@link #produce(ConsumerRecords)} 
method and the
-        * {@link #pollNext()} will throw a {@link ClosedException} on any 
currently blocking and
-        * future invocations.
-        * 
-        * <p>If an exception was previously reported via the {@link 
#reportError(Throwable)} method,
-        * that exception will not be overridden. The consumer thread will 
throw that exception upon
-        * calling {@link #pollNext()}, rather than the {@code ClosedException}.
-        */
-       @Override
-       public void close() {
-               synchronized (lock) {
-                       next = null;
-                       wakeupProducer = false;
-
-                       if (error == null) {
-                               error = new ClosedException();
-                       }
-                       lock.notifyAll();
-               }
-       }
-
-       /**
-        * Wakes the producer thread up. If the producer thread is currently 
blocked in
-        * the {@link #produce(ConsumerRecords)} method, it will exit the 
method throwing
-        * a {@link WakeupException}.
-        */
-       public void wakeupProducer() {
-               synchronized (lock) {
-                       wakeupProducer = true;
-                       lock.notifyAll();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * An exception thrown by the Handover in the {@link #pollNext()} or
-        * {@link #produce(ConsumerRecords)} method, after the Handover was 
closed via
-        * {@link #close()}.
-        */
-       public static final class ClosedException extends Exception {
-               private static final long serialVersionUID = 1L;
-       }
-
-       /**
-        * A special exception thrown bv the Handover in the {@link 
#produce(ConsumerRecords)}
-        * method when the producer is woken up from a blocking call via {@link 
#wakeupProducer()}.
-        */
-       public static final class WakeupException extends Exception {
-               private static final long serialVersionUID = 1L;
-       }
-}

Reply via email to