[FLINK-4035] Add support for Kafka 0.10.x.

This closes #2231


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63859c64
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63859c64
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63859c64

Branch: refs/heads/master
Commit: 63859c648a2a0ff024228f6e0687d837b8896322
Parents: a079259
Author: radekg <ra...@gruchalski.com>
Authored: Tue Jul 12 13:19:01 2016 -0400
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Tue Oct 11 10:04:25 2016 +0200

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          | 179 ++++++++++
 .../connectors/kafka/FlinkKafkaConsumer010.java | 259 +++++++++++++++
 .../connectors/kafka/FlinkKafkaProducer010.java | 137 ++++++++
 .../kafka/Kafka010JsonTableSource.java          |  71 ++++
 .../connectors/kafka/Kafka010TableSource.java   |  75 +++++
 .../kafka/internal/Kafka010Fetcher.java         | 312 +++++++++++++++++
 .../src/main/resources/log4j.properties         |  29 ++
 .../connectors/kafka/Kafka010ITCase.java        | 192 +++++++++++
 .../kafka/Kafka010ProducerITCase.java           |  33 ++
 .../connectors/kafka/KafkaProducerTest.java     | 119 +++++++
 .../kafka/KafkaShortRetention010ITCase.java     |  34 ++
 .../kafka/KafkaTestEnvironmentImpl.java         | 331 +++++++++++++++++++
 .../src/test/resources/log4j-test.properties    |  30 ++
 .../src/test/resources/logback-test.xml         |  30 ++
 flink-streaming-connectors/pom.xml              |   1 +
 15 files changed, 1832 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
new file mode 100644
index 0000000..f2bcb11
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
@@ -0,0 +1,179 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-streaming-connectors</artifactId>
+               <version>1.1-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
+       <name>flink-connector-kafka-0.10</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <kafka.version>0.10.0.0</kafka.version>
+       </properties>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-kafka-base_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.kafka</groupId>
+                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+                       <!-- Projects depending on this project,
+                       won't depend on flink-table. -->
+                       <optional>true</optional>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka-clients</artifactId>
+                       <version>${kafka.version}</version>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-kafka-base_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <!-- exclude 0.8 dependencies -->
+                               <exclusion>
+                                       <groupId>org.apache.kafka</groupId>
+                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
+                               </exclusion>
+                       </exclusions>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <!-- include 0.10 server for tests  -->
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka_${scala.binary.version}</artifactId>
+                       <version>${kafka.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-tests_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <includes>
+                                                               
<include>**/KafkaTestEnvironmentImpl*</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-source-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>attach-test-sources</id>
+                                               <goals>
+                                                       
<goal>test-jar-no-fork</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <includes>
+                                                               
<include>**/KafkaTestEnvironmentImpl*</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
+                                       <forkCount>1</forkCount>
+                                       <argLine>-Xms256m -Xmx1000m 
-Dlog4j.configuration=${log4j.configuration} 
-Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+       
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
new file mode 100644
index 0000000..78ccd4a
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
+ * Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, 
each of which will pull
+ * data from one or more Kafka partitions. 
+ * 
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees 
that no data is lost
+ * during a failure, and that the computation processes elements "exactly 
once". 
+ * (Note: These guarantees naturally assume that Kafka itself does not loose 
any data.)</p>
+ *
+ * <p>Please note that Flink snapshots the offsets internally as part of its 
distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of 
progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of how 
far the Flink Kafka consumer
+ * has consumed a topic.</p>
+ *
+ * <p>Please refer to Kafka's documentation for the available configuration 
properties:
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
+ *
+ * <p><b>NOTE:</b> The implementation currently accesses partition metadata 
when the consumer
+ * is constructed. That means that the client that submits the program needs 
to be able to
+ * reach the Kafka brokers or ZooKeeper.</p>
+ */
+public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
+
+       private static final long serialVersionUID = 2324564345203409112L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumer010.class);
+
+       /**  Configuration key to change the polling timeout **/
+       public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
+
+       /** Boolean configuration key to disable metrics tracking **/
+       public static final String KEY_DISABLE_METRICS = 
"flink.disable-metrics";
+
+       /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in 
poll if data is not
+        * available. If 0, returns immediately with any records that are 
available now. */
+       public static final long DEFAULT_POLL_TIMEOUT = 100L;
+
+       // 
------------------------------------------------------------------------
+
+       /** User-supplied properties for Kafka **/
+       private final Properties properties;
+
+       /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in 
poll if data is not
+        * available. If 0, returns immediately with any records that are 
available now */
+       private final long pollTimeout;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.10.x
+        *
+        * @param topic
+        *           The name of the topic that should be consumed.
+        * @param valueDeserializer
+        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
+        * @param props
+        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
+        */
+       public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
+               this(Collections.singletonList(topic), valueDeserializer, 
props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.10.x
+        *
+        * This constructor allows passing a {@see KeyedDeserializationSchema} 
for reading key/value
+        * pairs, offsets, and topic names from Kafka.
+        *
+        * @param topic
+        *           The name of the topic that should be consumed.
+        * @param deserializer
+        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
+        * @param props
+        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
+        */
+       public FlinkKafkaConsumer010(String topic, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
+               this(Collections.singletonList(topic), deserializer, props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.10.x
+        *
+        * This constructor allows passing multiple topics to the consumer.
+        *
+        * @param topics
+        *           The Kafka topics to read from.
+        * @param deserializer
+        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
+        * @param props
+        *           The properties that are used to configure both the fetcher 
and the offset handler.
+        */
+       public FlinkKafkaConsumer010(List<String> topics, 
DeserializationSchema<T> deserializer, Properties props) {
+               this(topics, new 
KeyedDeserializationSchemaWrapper<>(deserializer), props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.10.x
+        *
+        * This constructor allows passing multiple topics and a key/value 
deserialization schema.
+        *
+        * @param topics
+        *           The Kafka topics to read from.
+        * @param deserializer
+        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
+        * @param props
+        *           The properties that are used to configure both the fetcher 
and the offset handler.
+        */
+       public FlinkKafkaConsumer010(List<String> topics, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
+               super(deserializer);
+
+               checkNotNull(topics, "topics");
+               this.properties = checkNotNull(props, "props");
+               setDeserializer(this.properties);
+
+               // configure the polling timeout
+               try {
+                       if (properties.containsKey(KEY_POLL_TIMEOUT)) {
+                               this.pollTimeout = 
Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
+                       } else {
+                               this.pollTimeout = DEFAULT_POLL_TIMEOUT;
+                       }
+               }
+               catch (Exception e) {
+                       throw new IllegalArgumentException("Cannot parse poll 
timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
+               }
+
+               // read the partitions that belong to the listed topics
+               final List<KafkaTopicPartition> partitions = new ArrayList<>();
+
+               try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(this.properties)) {
+                       for (final String topic: topics) {
+                               // get partitions for each topic
+                               List<PartitionInfo> partitionsForTopic = 
consumer.partitionsFor(topic);
+                               // for non existing topics, the list might be 
null.
+                               if (partitionsForTopic != null) {
+                                       
partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
+                               }
+                       }
+               }
+
+               if (partitions.isEmpty()) {
+                       throw new RuntimeException("Unable to retrieve any 
partitions for the requested topics " + topics);
+               }
+
+               // we now have a list of partitions which is the same for all 
parallel consumer instances.
+               LOG.info("Got {} partitions from these topics: {}", 
partitions.size(), topics);
+
+               if (LOG.isInfoEnabled()) {
+                       logPartitionInfo(LOG, partitions);
+               }
+
+               // register these partitions
+               setSubscribedPartitions(partitions);
+       }
+
+       @Override
+       protected AbstractFetcher<T, ?> createFetcher(
+                       SourceContext<T> sourceContext,
+                       List<KafkaTopicPartition> thisSubtaskPartitions,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       StreamingRuntimeContext runtimeContext) throws 
Exception {
+
+               boolean useMetrics = 
!Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
+
+               return new Kafka010Fetcher<>(sourceContext, 
thisSubtaskPartitions,
+                               watermarksPeriodic, watermarksPunctuated,
+                               runtimeContext, deserializer,
+                               properties, pollTimeout, useMetrics);
+               
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities 
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Converts a list of Kafka PartitionInfo's to Flink's 
KafkaTopicPartition (which are serializable)
+        * 
+        * @param partitions A list of Kafka PartitionInfos.
+        * @return A list of KafkaTopicPartitions
+        */
+       private static List<KafkaTopicPartition> 
convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
+               checkNotNull(partitions);
+
+               List<KafkaTopicPartition> ret = new 
ArrayList<>(partitions.size());
+               for (PartitionInfo pi : partitions) {
+                       ret.add(new KafkaTopicPartition(pi.topic(), 
pi.partition()));
+               }
+               return ret;
+       }
+
+       /**
+        * Makes sure that the ByteArrayDeserializer is registered in the Kafka 
properties.
+        * 
+        * @param props The Kafka properties to register the serializer in.
+        */
+       private static void setDeserializer(Properties props) {
+               final String deSerName = 
ByteArrayDeserializer.class.getCanonicalName();
+
+               Object keyDeSer = 
props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+               Object valDeSer = 
props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+               if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
+                       LOG.warn("Ignoring configured key DeSerializer ({})", 
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+               }
+               if (valDeSer != null && !valDeSer.equals(deSerName)) {
+                       LOG.warn("Ignoring configured value DeSerializer ({})", 
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+               }
+
+               props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
deSerName);
+               props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deSerName);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
new file mode 100644
index 0000000..49bce39
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible 
with Kafka 0.8.
+ *
+ * Please note that this producer does not have any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> {
+
+       private static final long serialVersionUID = 1L;
+
+       // ------------------- Keyless serialization schema constructors 
----------------------
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param brokerList
+        *                      Comma separated addresses of the brokers
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined (keyless) serialization schema.
+        */
+       public FlinkKafkaProducer010(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined (keyless) serialization schema.
+        * @param producerConfig
+        *                      Properties with the producer configuration.
+        */
+       public FlinkKafkaProducer010(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<IN>());
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+        */
+       public FlinkKafkaProducer010(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner<IN> 
customPartitioner) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
+
+       }
+
+       // ------------------- Key/Value serialization schema constructors 
----------------------
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param brokerList
+        *                      Comma separated addresses of the brokers
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined serialization schema supporting 
key/value messages
+        */
+       public FlinkKafkaProducer010(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
+               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined serialization schema supporting 
key/value messages
+        * @param producerConfig
+        *                      Properties with the producer configuration.
+        */
+       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
+               this(topicId, serializationSchema, producerConfig, new 
FixedPartitioner<IN>());
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        */
+       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
+               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
+       }
+
+       @Override
+       protected void flush() {
+               if (this.producer != null) {
+                       producer.flush();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
new file mode 100644
index 0000000..cda68ce
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.10.
+ */
+public class Kafka010JsonTableSource extends KafkaJsonTableSource {
+
+       /**
+        * Creates a Kafka 0.10 JSON {@link StreamTableSource}.
+        *
+        * @param topic      Kafka topic to consume.
+        * @param properties Properties for the Kafka consumer.
+        * @param fieldNames Row field names.
+        * @param fieldTypes Row field types.
+        */
+       public Kafka010JsonTableSource(
+                       String topic,
+                       Properties properties,
+                       String[] fieldNames,
+                       TypeInformation<?>[] fieldTypes) {
+
+               super(topic, properties, fieldNames, fieldTypes);
+       }
+
+       /**
+        * Creates a Kafka 0.10 JSON {@link StreamTableSource}.
+        *
+        * @param topic      Kafka topic to consume.
+        * @param properties Properties for the Kafka consumer.
+        * @param fieldNames Row field names.
+        * @param fieldTypes Row field types.
+        */
+       public Kafka010JsonTableSource(
+                       String topic,
+                       Properties properties,
+                       String[] fieldNames,
+                       Class<?>[] fieldTypes) {
+
+               super(topic, properties, fieldNames, fieldTypes);
+       }
+
+       @Override
+       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+               return new FlinkKafkaConsumer010<>(topic, 
deserializationSchema, properties);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
new file mode 100644
index 0000000..cee1b90
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.10.
+ */
+public class Kafka010TableSource extends KafkaTableSource {
+
+       /**
+        * Creates a Kafka 0.10 {@link StreamTableSource}.
+        *
+        * @param topic                 Kafka topic to consume.
+        * @param properties            Properties for the Kafka consumer.
+        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
+        * @param fieldNames            Row field names.
+        * @param fieldTypes            Row field types.
+        */
+       public Kafka010TableSource(
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema,
+                       String[] fieldNames,
+                       TypeInformation<?>[] fieldTypes) {
+
+               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
+       }
+
+       /**
+        * Creates a Kafka 0.10 {@link StreamTableSource}.
+        *
+        * @param topic                 Kafka topic to consume.
+        * @param properties            Properties for the Kafka consumer.
+        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
+        * @param fieldNames            Row field names.
+        * @param fieldTypes            Row field types.
+        */
+       public Kafka010TableSource(
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema,
+                       String[] fieldNames,
+                       Class<?>[] fieldTypes) {
+
+               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
+       }
+
+       @Override
+       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+               return new FlinkKafkaConsumer010<>(topic, 
deserializationSchema, properties);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
new file mode 100644
index 0000000..70f530b
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer 
API.
+ * 
+ * @param <T> The type of elements produced by the fetcher.
+ */
+public class Kafka010Fetcher<T> extends AbstractFetcher<T, TopicPartition> 
implements Runnable {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(Kafka010Fetcher.class);
+
+       // 
------------------------------------------------------------------------
+
+       /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
+       private final KeyedDeserializationSchema<T> deserializer;
+
+       /** The subtask's runtime context */
+       private final RuntimeContext runtimeContext;
+
+       /** The configuration for the Kafka consumer */
+       private final Properties kafkaProperties;
+
+       /** The maximum number of milliseconds to wait for a fetch batch */
+       private final long pollTimeout;
+
+       /** Flag whether to register Kafka metrics as Flink accumulators */
+       private final boolean forwardKafkaMetrics;
+
+       /** Mutex to guard against concurrent access to the non-threadsafe 
Kafka consumer */
+       private final Object consumerLock = new Object();
+
+       /** Reference to the Kafka consumer, once it is created */
+       private volatile KafkaConsumer<byte[], byte[]> consumer;
+
+       /** Reference to the proxy, forwarding exceptions from the fetch thread 
to the main thread */
+       private volatile ExceptionProxy errorHandler;
+
+       /** Flag to mark the main work loop as alive */
+       private volatile boolean running = true;
+
+       // 
------------------------------------------------------------------------
+
+       public Kafka010Fetcher(
+                       SourceContext<T> sourceContext,
+                       List<KafkaTopicPartition> assignedPartitions,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       StreamingRuntimeContext runtimeContext,
+                       KeyedDeserializationSchema<T> deserializer,
+                       Properties kafkaProperties,
+                       long pollTimeout,
+                       boolean forwardKafkaMetrics) throws Exception
+       {
+               super(sourceContext, assignedPartitions, watermarksPeriodic, 
watermarksPunctuated, runtimeContext);
+
+               this.deserializer = deserializer;
+               this.runtimeContext = runtimeContext;
+               this.kafkaProperties = kafkaProperties;
+               this.pollTimeout = pollTimeout;
+               this.forwardKafkaMetrics = forwardKafkaMetrics;
+
+               // if checkpointing is enabled, we are not automatically 
committing to Kafka.
+               
kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+                               
Boolean.toString(!runtimeContext.isCheckpointingEnabled()));
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Fetcher work methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void runFetchLoop() throws Exception {
+               this.errorHandler = new ExceptionProxy(Thread.currentThread());
+
+               // rather than running the main fetch loop directly here, we 
spawn a dedicated thread
+               // this makes sure that no interrupt() call upon canceling 
reaches the Kafka consumer code
+               Thread runner = new Thread(this, "Kafka 0.10 Fetcher for " + 
runtimeContext.getTaskNameWithSubtasks());
+               runner.setDaemon(true);
+               runner.start();
+
+               try {
+                       runner.join();
+               } catch (InterruptedException e) {
+                       // may be the result of a wake-up after an exception. 
we ignore this here and only
+                       // restore the interruption state
+                       Thread.currentThread().interrupt();
+               }
+
+               // make sure we propagate any exception that occurred in the 
concurrent fetch thread,
+               // before leaving this method
+               this.errorHandler.checkAndThrowException();
+       }
+
+       @Override
+       public void cancel() {
+               // flag the main thread to exit
+               running = false;
+
+               // NOTE:
+               //   - We cannot interrupt the runner thread, because the Kafka 
consumer may
+               //     deadlock when the thread is interrupted while in certain 
methods
+               //   - We cannot call close() on the consumer, because it will 
actually throw
+               //     an exception if a concurrent call is in progress
+
+               // make sure the consumer finds out faster that we are shutting 
down 
+               if (consumer != null) {
+                       consumer.wakeup();
+               }
+       }
+
+       @Override
+       public void run() {
+               // This method initializes the KafkaConsumer and guarantees it 
is torn down properly.
+               // This is important, because the consumer has multi-threading 
issues,
+               // including concurrent 'close()' calls.
+
+               final KafkaConsumer<byte[], byte[]> consumer;
+               try {
+                       consumer = new KafkaConsumer<>(kafkaProperties);
+               }
+               catch (Throwable t) {
+                       running = false;
+                       errorHandler.reportError(t);
+                       return;
+               }
+
+               // from here on, the consumer will be closed properly
+               try {
+                       
consumer.assign(convertKafkaPartitions(subscribedPartitions()));
+
+                       // register Kafka metrics to Flink accumulators
+                       if (forwardKafkaMetrics) {
+                               Map<MetricName, ? extends Metric> metrics = 
consumer.metrics();
+                               if (metrics == null) {
+                                       // MapR's Kafka implementation returns 
null here.
+                                       LOG.info("Consumer implementation does 
not support metrics");
+                               } else {
+                                       // we have metrics, register them where 
possible
+                                       for (Map.Entry<MetricName, ? extends 
Metric> metric : metrics.entrySet()) {
+                                               String name = "KafkaConsumer-" 
+ metric.getKey().name();
+                                               DefaultKafkaMetricAccumulator 
kafkaAccumulator =
+                                                               
DefaultKafkaMetricAccumulator.createFor(metric.getValue());
+
+                                               // best effort: we only add the 
accumulator if available.
+                                               if (kafkaAccumulator != null) {
+                                                       
runtimeContext.addAccumulator(name, kafkaAccumulator);
+                                               }
+                                       }
+                               }
+                       }
+
+                       // seek the consumer to the initial offsets
+                       for (KafkaTopicPartitionState<TopicPartition> partition 
: subscribedPartitions()) {
+                               if (partition.isOffsetDefined()) {
+                                       
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
+                               }
+                       }
+
+                       // from now on, external operations may call the 
consumer
+                       this.consumer = consumer;
+
+                       // main fetch loop
+                       while (running) {
+                               // get the next batch of records
+                               final ConsumerRecords<byte[], byte[]> records;
+                               synchronized (consumerLock) {
+                                       try {
+                                               records = 
consumer.poll(pollTimeout);
+                                       }
+                                       catch (WakeupException we) {
+                                               if (running) {
+                                                       throw we;
+                                               } else {
+                                                       continue;
+                                               }
+                                       }
+                               }
+
+                               // get the records for each topic partition
+                               for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitions()) {
+                                       
+                                       List<ConsumerRecord<byte[], byte[]>> 
partitionRecords = records.records(partition.getKafkaPartitionHandle());
+
+                                       for (ConsumerRecord<byte[], byte[]> 
record : partitionRecords) {
+                                               T value = 
deserializer.deserialize(
+                                                               record.key(), 
record.value(),
+                                                               record.topic(), 
record.partition(), record.offset());
+
+                                               if 
(deserializer.isEndOfStream(value)) {
+                                                       // end of stream 
signaled
+                                                       running = false;
+                                                       break;
+                                               }
+
+                                               // emit the actual record. this 
also update offset state atomically
+                                               // and deals with timestamps 
and watermark generation
+                                               emitRecord(value, partition, 
record.offset());
+                                       }
+                               }
+                       }
+                       // end main fetch loop
+               }
+               catch (Throwable t) {
+                       if (running) {
+                               running = false;
+                               errorHandler.reportError(t);
+                       } else {
+                               LOG.debug("Stopped ConsumerThread threw 
exception", t);
+                       }
+               }
+               finally {
+                       try {
+                               synchronized (consumerLock) {
+                                       consumer.close();
+                               }
+                       } catch (Throwable t) {
+                               LOG.warn("Error while closing Kafka 0.10 
consumer", t);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Kafka 0.10 specific fetcher behavior
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition 
partition) {
+               return new TopicPartition(partition.getTopic(), 
partition.getPartition());
+       }
+
+       @Override
+       public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> 
offsets) throws Exception {
+               KafkaTopicPartitionState<TopicPartition>[] partitions = 
subscribedPartitions();
+               Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>(partitions.length);
+
+               for (KafkaTopicPartitionState<TopicPartition> partition : 
partitions) {
+                       Long offset = 
offsets.get(partition.getKafkaTopicPartition());
+                       if (offset != null) {
+                               
offsetsToCommit.put(partition.getKafkaPartitionHandle(), new 
OffsetAndMetadata(offset, ""));
+                       }
+               }
+
+               if (this.consumer != null) {
+                       synchronized (consumerLock) {
+                               this.consumer.commitSync(offsetsToCommit);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       public static Collection<TopicPartition> 
convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
+               ArrayList<TopicPartition> result = new 
ArrayList<>(partitions.length);
+               for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
+                       result.add(p.getKafkaPartitionHandle());
+               }
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
new file mode 100644
index 0000000..5427853
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+public class Kafka010ITCase extends KafkaConsumerTestBase {
+
+       // 
------------------------------------------------------------------------
+       //  Suite of Tests
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String getExpectedKafkaVersion() {
+               return "0.10";
+       }
+
+       @Test(timeout = 60000)
+       public void testFailOnNoBroker() throws Exception {
+               runFailOnNoBrokerTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testConcurrentProducerConsumerTopology() throws Exception {
+               runSimpleConcurrentProducerConsumerTopology();
+       }
+
+//     @Test(timeout = 60000)
+//     public void testPunctuatedExplicitWMConsumer() throws Exception {
+//             runExplicitPunctuatedWMgeneratingConsumerTest(false);
+//     }
+
+//     @Test(timeout = 60000)
+//     public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws 
Exception {
+//             runExplicitPunctuatedWMgeneratingConsumerTest(true);
+//     }
+
+       @Test(timeout = 60000)
+       public void testKeyValueSupport() throws Exception {
+               runKeyValueTest();
+       }
+
+       // --- canceling / failures ---
+
+       @Test(timeout = 60000)
+       public void testCancelingEmptyTopic() throws Exception {
+               runCancelingOnEmptyInputTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testCancelingFullTopic() throws Exception {
+               runCancelingOnFullInputTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testFailOnDeploy() throws Exception {
+               runFailOnDeployTest();
+       }
+
+
+       // --- source to partition mappings and exactly once ---
+
+       @Test(timeout = 60000)
+       public void testOneToOneSources() throws Exception {
+               runOneToOneExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testOneSourceMultiplePartitions() throws Exception {
+               runOneSourceMultiplePartitionsExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleSourcesOnePartition() throws Exception {
+               runMultipleSourcesOnePartitionExactlyOnceTest();
+       }
+
+       // --- broker failure ---
+
+       @Test(timeout = 60000)
+       public void testBrokerFailure() throws Exception {
+               runBrokerFailureTest();
+       }
+
+       // --- special executions ---
+
+       @Test(timeout = 60000)
+       public void testBigRecordJob() throws Exception {
+               runBigRecordTestTopology();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleTopics() throws Exception {
+               runProduceConsumeMultipleTopics();
+       }
+
+       @Test(timeout = 60000)
+       public void testAllDeletes() throws Exception {
+               runAllDeletesTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testMetricsAndEndOfStream() throws Exception {
+               runMetricsAndEndOfStreamTest();
+       }
+
+       @Test
+       public void testJsonTableSource() throws Exception {
+               String topic = UUID.randomUUID().toString();
+
+               // Names and types are determined in the actual test method of 
the
+               // base test class.
+               Kafka010JsonTableSource tableSource = new 
Kafka010JsonTableSource(
+                               topic,
+                               standardProps,
+                               new String[] {
+                                               "long",
+                                               "string",
+                                               "boolean",
+                                               "double",
+                                               "missing-field"},
+                               new TypeInformation<?>[] {
+                                               BasicTypeInfo.LONG_TYPE_INFO,
+                                               BasicTypeInfo.STRING_TYPE_INFO,
+                                               BasicTypeInfo.BOOLEAN_TYPE_INFO,
+                                               BasicTypeInfo.DOUBLE_TYPE_INFO,
+                                               BasicTypeInfo.LONG_TYPE_INFO });
+
+               // Don't fail on missing field, but set to null (default)
+               tableSource.setFailOnMissingField(false);
+
+               runJsonTableSource(topic, tableSource);
+       }
+
+       @Test
+       public void testJsonTableSourceWithFailOnMissingField() throws 
Exception {
+               String topic = UUID.randomUUID().toString();
+
+               // Names and types are determined in the actual test method of 
the
+               // base test class.
+               Kafka010JsonTableSource tableSource = new 
Kafka010JsonTableSource(
+                               topic,
+                               standardProps,
+                               new String[] {
+                                               "long",
+                                               "string",
+                                               "boolean",
+                                               "double",
+                                               "missing-field"},
+                               new TypeInformation<?>[] {
+                                               BasicTypeInfo.LONG_TYPE_INFO,
+                                               BasicTypeInfo.STRING_TYPE_INFO,
+                                               BasicTypeInfo.BOOLEAN_TYPE_INFO,
+                                               BasicTypeInfo.DOUBLE_TYPE_INFO,
+                                               BasicTypeInfo.LONG_TYPE_INFO });
+
+               // Don't fail on missing field, but set to null (default)
+               tableSource.setFailOnMissingField(true);
+
+               try {
+                       runJsonTableSource(topic, tableSource);
+                       fail("Did not throw expected Exception");
+               } catch (Exception e) {
+                       Throwable rootCause = 
e.getCause().getCause().getCause();
+                       assertTrue("Unexpected root cause", rootCause 
instanceof IllegalStateException);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
new file mode 100644
index 0000000..42b9682
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import org.junit.Test;
+
+
+@SuppressWarnings("serial")
+public class Kafka010ProducerITCase extends KafkaProducerTestBase {
+
+       @Test
+       public void testCustomPartitioning() {
+               runCustomPartitioningTest();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..5f5ac63
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaProducerBase.class)
+public class KafkaProducerTest extends TestLogger {
+       
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testPropagateExceptions() {
+               try {
+                       // mock kafka producer
+                       KafkaProducer<?, ?> kafkaProducerMock = 
mock(KafkaProducer.class);
+                       
+                       // partition setup
+                       
when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
+                                       Collections.singletonList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
+
+                       // failure when trying to send an element
+                       when(kafkaProducerMock.send(any(ProducerRecord.class), 
any(Callback.class)))
+                               .thenAnswer(new 
Answer<Future<RecordMetadata>>() {
+                                       @Override
+                                       public Future<RecordMetadata> 
answer(InvocationOnMock invocation) throws Throwable {
+                                               Callback callback = (Callback) 
invocation.getArguments()[1];
+                                               callback.onCompletion(null, new 
Exception("Test error"));
+                                               return null;
+                                       }
+                               });
+                       
+                       // make sure the FlinkKafkaProducer instantiates our 
mock producer
+                       
whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
+                       
+                       // (1) producer that propagates errors
+
+                       FlinkKafkaProducer010<String> producerPropagating = new 
FlinkKafkaProducer010<>(
+                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
+
+                       producerPropagating.setRuntimeContext(new 
MockRuntimeContext(17, 3));
+                       producerPropagating.open(new Configuration());
+                       
+                       try {
+                               producerPropagating.invoke("value");
+                               producerPropagating.invoke("value");
+                               fail("This should fail with an exception");
+                       }
+                       catch (Exception e) {
+                               assertNotNull(e.getCause());
+                               assertNotNull(e.getCause().getMessage());
+                               
assertTrue(e.getCause().getMessage().contains("Test error"));
+                       }
+
+                       // (2) producer that only logs errors
+
+                       FlinkKafkaProducer010<String> producerLogging = new 
FlinkKafkaProducer010<>(
+                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
+                       producerLogging.setLogFailuresOnly(true);
+                       
+                       producerLogging.setRuntimeContext(new 
MockRuntimeContext(17, 3));
+                       producerLogging.open(new Configuration());
+
+                       producerLogging.invoke("value");
+                       producerLogging.invoke("value");
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
new file mode 100644
index 0000000..1d36198
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class KafkaShortRetention010ITCase extends KafkaShortRetentionTestBase {
+
+       @Test(timeout=60000)
+       public void testAutoOffsetReset() throws Exception {
+               runAutoOffsetResetTest();
+       }
+
+       @Test(timeout=60000)
+       public void testAutoOffsetResetNone() throws Exception {
+               runFailOnAutoOffsetResetNone();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 0000000..45f0478
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.common.KafkaException;
+import kafka.network.SocketServer;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.SystemTime$;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 0.10
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+       private File tmpZkDir;
+       private File tmpKafkaParent;
+       private List<File> tmpKafkaDirs;
+       private List<KafkaServer> brokers;
+       private TestingServer zookeeper;
+       private String zookeeperConnectionString;
+       private String brokerConnectionString = "";
+       private Properties standardProps;
+       private Properties additionalServerProperties;
+
+       public String getBrokerConnectionString() {
+               return brokerConnectionString;
+       }
+
+       @Override
+       public Properties getStandardProperties() {
+               return standardProps;
+       }
+
+       @Override
+       public String getVersion() {
+               return "0.10";
+       }
+
+       @Override
+       public List<KafkaServer> getBrokers() {
+               return brokers;
+       }
+
+       @Override
+       public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, 
KeyedDeserializationSchema<T> readSchema, Properties props) {
+               return new FlinkKafkaConsumer010<>(topics, readSchema, props);
+       }
+
+       @Override
+       public <T> FlinkKafkaProducerBase<T> getProducer(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> 
partitioner) {
+               FlinkKafkaProducer010<T> prod = new 
FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
+               prod.setFlushOnCheckpoint(true);
+               return prod;
+       }
+
+       @Override
+       public void restartBroker(int leaderId) throws Exception {
+               brokers.set(leaderId, getKafkaServer(leaderId, 
tmpKafkaDirs.get(leaderId)));
+       }
+
+       @Override
+       public int getLeaderToShutDown(String topic) throws Exception {
+               ZkUtils zkUtils = getZkUtils();
+               try {
+                       MetadataResponse.PartitionMetadata firstPart = null;
+                       do {
+                               if (firstPart != null) {
+                                       LOG.info("Unable to find leader. error 
code {}", firstPart.error().code());
+                                       // not the first try. Sleep a bit
+                                       Thread.sleep(150);
+                               }
+
+                               List<MetadataResponse.PartitionMetadata> 
partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, 
zkUtils).partitionMetadata();
+                               firstPart = partitionMetadata.get(0);
+                       }
+                       while (firstPart.error().code() != 0);
+
+                       return firstPart.leader().id();
+               } finally {
+                       zkUtils.close();
+               }
+       }
+
+       @Override
+       public int getBrokerId(KafkaServer server) {
+               return server.config().brokerId();
+       }
+
+       @Override
+       public void prepare(int numKafkaServers, Properties 
additionalServerProperties) {
+               this.additionalServerProperties = additionalServerProperties;
+               File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+               tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
+               assertTrue("cannot create zookeeper temp dir", 
tmpZkDir.mkdirs());
+
+               tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + 
(UUID.randomUUID().toString()));
+               assertTrue("cannot create kafka temp dir", 
tmpKafkaParent.mkdirs());
+
+               tmpKafkaDirs = new ArrayList<>(numKafkaServers);
+               for (int i = 0; i < numKafkaServers; i++) {
+                       File tmpDir = new File(tmpKafkaParent, "server-" + i);
+                       assertTrue("cannot create kafka temp dir", 
tmpDir.mkdir());
+                       tmpKafkaDirs.add(tmpDir);
+               }
+
+               zookeeper = null;
+               brokers = null;
+
+               try {
+                       LOG.info("Starting Zookeeper");
+                       zookeeper = new TestingServer(-1, tmpZkDir);
+                       zookeeperConnectionString = 
zookeeper.getConnectString();
+
+                       LOG.info("Starting KafkaServer");
+                       brokers = new ArrayList<>(numKafkaServers);
+
+                       for (int i = 0; i < numKafkaServers; i++) {
+                               brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i)));
+
+                               SocketServer socketServer = 
brokers.get(i).socketServer();
+                               brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+                       }
+
+                       LOG.info("ZK and KafkaServer started.");
+               }
+               catch (Throwable t) {
+                       t.printStackTrace();
+                       fail("Test setup failed: " + t.getMessage());
+               }
+
+               standardProps = new Properties();
+               standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
+               standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
+               standardProps.setProperty("group.id", "flink-tests");
+               standardProps.setProperty("auto.commit.enable", "false");
+               standardProps.setProperty("zookeeper.session.timeout.ms", 
"30000"); // 6 seconds is default. Seems to be too small for travis.
+               standardProps.setProperty("zookeeper.connection.timeout.ms", 
"30000");
+               standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning. (earliest is kafka 0.10 value)
+               standardProps.setProperty("fetch.message.max.bytes", "256"); // 
make a lot of fetches (MESSAGES MUST BE SMALLER!)
+       }
+
+       @Override
+       public void shutdown() {
+               for (KafkaServer broker : brokers) {
+                       if (broker != null) {
+                               broker.shutdown();
+                       }
+               }
+               brokers.clear();
+
+               if (zookeeper != null) {
+                       try {
+                               zookeeper.stop();
+                       }
+                       catch (Exception e) {
+                               LOG.warn("ZK.stop() failed", e);
+                       }
+                       zookeeper = null;
+               }
+
+               // clean up the temp spaces
+
+               if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+                       try {
+                               FileUtils.deleteDirectory(tmpKafkaParent);
+                       }
+                       catch (Exception e) {
+                               // ignore
+                       }
+               }
+               if (tmpZkDir != null && tmpZkDir.exists()) {
+                       try {
+                               FileUtils.deleteDirectory(tmpZkDir);
+                       }
+                       catch (Exception e) {
+                               // ignore
+                       }
+               }
+       }
+
+       public ZkUtils getZkUtils() {
+               ZkClient creator = new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
+               return ZkUtils.apply(creator, false);
+       }
+
+       @Override
+       public void createTestTopic(String topic, int numberOfPartitions, int 
replicationFactor, Properties topicConfig) {
+               // create topic with one client
+               LOG.info("Creating topic {}", topic);
+
+               ZkUtils zkUtils = getZkUtils();
+               try {
+                       AdminUtils.createTopic(zkUtils, topic, 
numberOfPartitions, replicationFactor, topicConfig, new 
kafka.admin.RackAwareMode.Enforced$());
+               } finally {
+                       zkUtils.close();
+               }
+
+               // validate that the topic has been created
+               final long deadline = System.currentTimeMillis() + 30000;
+               do {
+                       try {
+                               Thread.sleep(100);
+                       } catch (InterruptedException e) {
+                               // restore interrupted state
+                       }
+                       // we could use AdminUtils.topicExists(zkUtils, topic) 
here, but it's results are
+                       // not always correct.
+
+                       // create a new ZK utils connection
+                       ZkUtils checkZKConn = getZkUtils();
+                       if(AdminUtils.topicExists(checkZKConn, topic)) {
+                               checkZKConn.close();
+                               return;
+                       }
+                       checkZKConn.close();
+               }
+               while (System.currentTimeMillis() < deadline);
+               fail("Test topic could not be created");
+       }
+
+       @Override
+       public void deleteTestTopic(String topic) {
+               ZkUtils zkUtils = getZkUtils();
+               try {
+                       LOG.info("Deleting topic {}", topic);
+
+                       ZkClient zk = new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
+
+                       AdminUtils.deleteTopic(zkUtils, topic);
+
+                       zk.close();
+               } finally {
+                       zkUtils.close();
+               }
+       }
+
+       /**
+        * Copied from 
com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+        */
+       protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) 
throws Exception {
+               Properties kafkaProperties = new Properties();
+
+               // properties have to be Strings
+               kafkaProperties.put("advertised.host.name", KAFKA_HOST);
+               kafkaProperties.put("broker.id", Integer.toString(brokerId));
+               kafkaProperties.put("log.dir", tmpFolder.toString());
+               kafkaProperties.put("zookeeper.connect", 
zookeeperConnectionString);
+               kafkaProperties.put("message.max.bytes", String.valueOf(50 * 
1024 * 1024));
+               kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
+
+               // for CI stability, increase zookeeper session timeout
+               kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
+               kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
+               if(additionalServerProperties != null) {
+                       kafkaProperties.putAll(additionalServerProperties);
+               }
+
+               final int numTries = 5;
+
+               for (int i = 1; i <= numTries; i++) {
+                       int kafkaPort = NetUtils.getAvailablePort();
+                       kafkaProperties.put("port", 
Integer.toString(kafkaPort));
+                       KafkaConfig kafkaConfig = new 
KafkaConfig(kafkaProperties);
+
+                       try {
+                               scala.Option<String> stringNone = 
scala.Option.apply(null);
+                               KafkaServer server = new 
KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+                               server.startup();
+                               return server;
+                       }
+                       catch (KafkaException e) {
+                               if (e.getCause() instanceof BindException) {
+                                       // port conflict, retry...
+                                       LOG.info("Port conflict when starting 
Kafka Broker. Retrying...");
+                               }
+                               else {
+                                       throw e;
+                               }
+                       }
+               }
+
+               throw new Exception("Could not start Kafka after " + numTries + 
" retries due to port conflicts.");
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..fbeb110
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/pom.xml 
b/flink-streaming-connectors/pom.xml
index 113109b..78e39ca 100644
--- a/flink-streaming-connectors/pom.xml
+++ b/flink-streaming-connectors/pom.xml
@@ -40,6 +40,7 @@ under the License.
                <module>flink-connector-kafka-base</module>
                <module>flink-connector-kafka-0.8</module>
                <module>flink-connector-kafka-0.9</module>
+               <module>flink-connector-kafka-0.10</module>
                <module>flink-connector-elasticsearch</module>
                <module>flink-connector-elasticsearch2</module>
                <module>flink-connector-rabbitmq</module>

Reply via email to