http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
new file mode 100644
index 0000000..9faa249
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
+ * Apache Kafka 0.9.x. The consumer can run in multiple parallel instances, 
each of which will pull
+ * data from one or more Kafka partitions. 
+ * 
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees 
that no data is lost
+ * during a failure, and that the computation processes elements "exactly 
once". 
+ * (Note: These guarantees naturally assume that Kafka itself does not loose 
any data.)</p>
+ *
+ * <p>Please note that Flink snapshots the offsets internally as part of its 
distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of 
progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of how 
far the Flink Kafka consumer
+ * has consumed a topic.</p>
+ *
+ * <p>Please refer to Kafka's documentation for the available configuration 
properties:
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
+ *
+ * <p><b>NOTE:</b> The implementation currently accesses partition metadata 
when the consumer
+ * is constructed. That means that the client that submits the program needs 
to be able to
+ * reach the Kafka brokers or ZooKeeper.</p>
+ */
+public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
+
+       // 
------------------------------------------------------------------------
+       
+       private static final long serialVersionUID = 2324564345203409112L;
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumer09.class);
+
+       /**  Configuration key to change the polling timeout **/
+       public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
+
+       /** Boolean configuration key to disable metrics tracking **/
+       public static final String KEY_DISABLE_METRICS = 
"flink.disable-metrics";
+
+       /**
+        * From Kafka's Javadoc: The time, in milliseconds, spent waiting in 
poll if data is not
+        * available. If 0, returns immediately with any records that are 
available now.
+        */
+       public static final long DEFAULT_POLL_TIMEOUT = 100L;
+
+       /** User-supplied properties for Kafka **/
+       private final Properties properties;
+       /** Ordered list of all partitions available in all subscribed 
partitions **/
+       private final List<KafkaTopicPartition> partitionInfos;
+
+       // ------  Runtime State  -------
+
+       /** The partitions actually handled by this consumer at runtime */
+       private transient List<TopicPartition> subscribedPartitions;
+       /** For performance reasons, we are keeping two representations of the 
subscribed partitions **/
+       private transient List<KafkaTopicPartition> subscribedPartitionsAsFlink;
+       /** The Kafka Consumer instance**/
+       private transient KafkaConsumer<byte[], byte[]> consumer;
+       /** The thread running Kafka's consumer **/
+       private transient ConsumerThread<T> consumerThread;
+       /** Exception set from the ConsumerThread */
+       private transient Throwable consumerThreadException;
+       /** If the consumer doesn't have a Kafka partition assigned at runtime, 
it'll block on this waitThread **/
+       private transient Thread waitThread;
+
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.9.x
+        *
+        * @param topic
+        *           The name of the topic that should be consumed.
+        * @param valueDeserializer
+        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
+        * @param props
+        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
+        */
+       public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
+               this(Collections.singletonList(topic), valueDeserializer, 
props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.9.x
+        *
+        * This constructor allows passing a {@see KeyedDeserializationSchema} 
for reading key/value
+        * pairs, offsets, and topic names from Kafka.
+        *
+        * @param topic
+        *           The name of the topic that should be consumed.
+        * @param deserializer
+        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
+        * @param props
+        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
+        */
+       public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> 
deserializer, Properties props) {
+               this(Collections.singletonList(topic), deserializer, props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.9.x
+        *
+        * This constructor allows passing multiple topics to the consumer.
+        *
+        * @param topics
+        *           The Kafka topics to read from.
+        * @param deserializer
+        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
+        * @param props
+        *           The properties that are used to configure both the fetcher 
and the offset handler.
+        */
+       public FlinkKafkaConsumer09(List<String> topics, 
DeserializationSchema<T> deserializer, Properties props) {
+               this(topics, new 
KeyedDeserializationSchemaWrapper<>(deserializer), props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.9.x
+        *
+        * This constructor allows passing multiple topics and a key/value 
deserialization schema.
+        *
+        * @param topics
+        *           The Kafka topics to read from.
+        * @param deserializer
+        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
+        * @param props
+        *           The properties that are used to configure both the fetcher 
and the offset handler.
+        */
+       public FlinkKafkaConsumer09(List<String> topics, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
+               super(deserializer, props);
+               checkNotNull(topics, "topics");
+               this.properties = checkNotNull(props, "props");
+               setDeserializer(this.properties);
+               KafkaConsumer<byte[], byte[]> consumer = null;
+               try {
+                       consumer = new KafkaConsumer<>(this.properties);
+                       this.partitionInfos = new ArrayList<>();
+                       for (final String topic: topics) {
+                               // get partitions for each topic
+                               List<PartitionInfo> partitionsForTopic = null;
+                               for(int tri = 0; tri < 10; tri++) {
+                                       LOG.info("Trying to get partitions for 
topic {}", topic);
+                                       try {
+                                               partitionsForTopic = 
consumer.partitionsFor(topic);
+                                               if(partitionsForTopic != null 
&& partitionsForTopic.size() > 0) {
+                                                       break; // it worked
+                                               }
+                                       } catch (NullPointerException npe) {
+                                               // workaround for KAFKA-2880: 
Fetcher.getTopicMetadata NullPointerException when broker cannot be reached
+                                               // we ignore the NPE.
+                                       }
+                                       // create a new consumer
+                                       consumer.close();
+                                       try {
+                                               Thread.sleep(1000);
+                                       } catch (InterruptedException e) {
+                                       }
+                                       consumer = new 
KafkaConsumer<>(properties);
+                               }
+                               // for non existing topics, the list might be 
null.
+                               if(partitionsForTopic != null) {
+                                       
partitionInfos.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
+                               }
+                       }
+               } finally {
+                       if(consumer != null) {
+                               consumer.close();
+                       }
+               }
+               if(partitionInfos.isEmpty()) {
+                       throw new RuntimeException("Unable to retrieve any 
partitions for the requested topics " + topics);
+               }
+
+               // we now have a list of partitions which is the same for all 
parallel consumer instances.
+               LOG.info("Got {} partitions from these topics: {}", 
partitionInfos.size(), topics);
+
+               if (LOG.isInfoEnabled()) {
+                       logPartitionInfo(partitionInfos);
+               }
+       }
+
+
+       /**
+        * Converts a list of Kafka PartitionInfo's to Flink's 
KafkaTopicPartition (which are serializable)
+        * @param partitions A list of Kafka PartitionInfos.
+        * @return A list of KafkaTopicPartitions
+        */
+       public static List<KafkaTopicPartition> 
convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
+               checkNotNull(partitions, "The given list of partitions was 
null");
+               List<KafkaTopicPartition> ret = new 
ArrayList<>(partitions.size());
+               for(PartitionInfo pi: partitions) {
+                       ret.add(new KafkaTopicPartition(pi.topic(), 
pi.partition()));
+               }
+               return ret;
+       }
+
+       public static List<TopicPartition> 
convertToKafkaTopicPartition(List<KafkaTopicPartition> partitions) {
+               List<TopicPartition> ret = new ArrayList<>(partitions.size());
+               for(KafkaTopicPartition ktp: partitions) {
+                       ret.add(new TopicPartition(ktp.getTopic(), 
ktp.getPartition()));
+               }
+               return ret;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Source life cycle
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+
+               final int numConsumers = 
getRuntimeContext().getNumberOfParallelSubtasks();
+               final int thisConsumerIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+
+               // pick which partitions we work on
+               this.subscribedPartitionsAsFlink = 
assignPartitions(this.partitionInfos, numConsumers, thisConsumerIndex);
+               if(this.subscribedPartitionsAsFlink.isEmpty()) {
+                       LOG.info("This consumer doesn't have any partitions 
assigned");
+                       this.offsetsState = null;
+                       return;
+               } else {
+                       StreamingRuntimeContext streamingRuntimeContext = 
(StreamingRuntimeContext) getRuntimeContext();
+                       // if checkpointing is enabled, we are not 
automatically committing to Kafka.
+                       
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
Boolean.toString(!streamingRuntimeContext.isCheckpointingEnabled()));
+                       this.consumer = new KafkaConsumer<>(properties);
+               }
+               subscribedPartitions = 
convertToKafkaTopicPartition(subscribedPartitionsAsFlink);
+
+               this.consumer.assign(this.subscribedPartitions);
+
+               // register Kafka metrics to Flink accumulators
+               
if(!Boolean.getBoolean(properties.getProperty(KEY_DISABLE_METRICS, "false"))) {
+                       Map<MetricName, ? extends Metric> metrics = 
this.consumer.metrics();
+                       if(metrics == null) {
+                               // MapR's Kafka implementation returns null 
here.
+                               LOG.info("Consumer implementation does not 
support metrics");
+                       } else {
+                               for (Map.Entry<MetricName, ? extends Metric> 
metric : metrics.entrySet()) {
+                                       String name = "consumer-" + 
metric.getKey().name();
+                                       DefaultKafkaMetricAccumulator 
kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue());
+                                       // best effort: we only add the 
accumulator if available.
+                                       if (kafkaAccumulator != null) {
+                                               
getRuntimeContext().addAccumulator(name, kafkaAccumulator);
+                                       }
+                               }
+                       }
+               }
+
+               // check if we need to explicitly seek to a specific offset 
(restore case)
+               if(restoreToOffset != null) {
+                       // we are in a recovery scenario
+                       for(Map.Entry<KafkaTopicPartition, Long> offset: 
restoreToOffset.entrySet()) {
+                               // seek all offsets to the right position
+                               this.consumer.seek(new 
TopicPartition(offset.getKey().getTopic(), offset.getKey().getPartition()), 
offset.getValue() + 1);
+                       }
+                       this.offsetsState = restoreToOffset;
+               } else {
+                       this.offsetsState = new HashMap<>();
+               }
+       }
+
+
+
+       @Override
+       public void run(SourceContext<T> sourceContext) throws Exception {
+               if(consumer != null) {
+                       consumerThread = new ConsumerThread<>(this, 
sourceContext);
+                       consumerThread.start();
+                       // wait for the consumer to stop
+                       while(consumerThread.isAlive()) {
+                               if(consumerThreadException != null) {
+                                       throw new 
RuntimeException("ConsumerThread threw an exception", consumerThreadException);
+                               }
+                               try {
+                                       consumerThread.join(50);
+                               } catch (InterruptedException ie) {
+                                       consumerThread.shutdown();
+                               }
+                       }
+                       // check again for an exception
+                       if(consumerThreadException != null) {
+                               throw new RuntimeException("ConsumerThread 
threw an exception", consumerThreadException);
+                       }
+               } else {
+                       // this source never completes, so emit a 
Long.MAX_VALUE watermark
+                       // to not block watermark forwarding
+                       if 
(getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) {
+                               sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+                       }
+
+                       final Object waitLock = new Object();
+                       this.waitThread = Thread.currentThread();
+                       while (running) {
+                               // wait until we are canceled
+                               try {
+                                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                                       synchronized (waitLock) {
+                                               waitLock.wait();
+                                       }
+                               }
+                               catch (InterruptedException e) {
+                                       // do nothing, check our "running" 
status
+                               }
+                       }
+               }
+               // close the context after the work was done. this can actually 
only
+               // happen when the fetcher decides to stop fetching
+               sourceContext.close();
+       }
+
+       @Override
+       public void cancel() {
+               // set ourselves as not running
+               running = false;
+               if(this.consumerThread != null) {
+                       this.consumerThread.shutdown();
+               } else {
+                       // the consumer thread is not running, so we have to 
interrupt our own thread
+                       if(waitThread != null) {
+                               waitThread.interrupt();
+                       }
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               cancel();
+               super.close();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Checkpoint and restore
+       // 
------------------------------------------------------------------------
+
+
+       @Override
+       protected void commitOffsets(HashMap<KafkaTopicPartition, Long> 
checkpointOffsets) {
+               Map<TopicPartition, OffsetAndMetadata> kafkaCheckpointOffsets = 
convertToCommitMap(checkpointOffsets);
+               synchronized (this.consumer) {
+                       this.consumer.commitSync(kafkaCheckpointOffsets);
+               }
+       }
+
+       public static Map<TopicPartition, OffsetAndMetadata> 
convertToCommitMap(HashMap<KafkaTopicPartition, Long> checkpointOffsets) {
+               Map<TopicPartition, OffsetAndMetadata> ret = new 
HashMap<>(checkpointOffsets.size());
+               for(Map.Entry<KafkaTopicPartition, Long> partitionOffset: 
checkpointOffsets.entrySet()) {
+                       ret.put(new 
TopicPartition(partitionOffset.getKey().getTopic(), 
partitionOffset.getKey().getPartition()),
+                                       new 
OffsetAndMetadata(partitionOffset.getValue(), ""));
+               }
+               return ret;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Miscellaneous utilities 
+       // 
------------------------------------------------------------------------
+
+
+       protected static void setDeserializer(Properties props) {
+               if 
(!props.contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
+                       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getCanonicalName());
+               } else {
+                       LOG.warn("Overwriting the '{}' is not recommended", 
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+               }
+
+               if 
(!props.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
+                       
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getCanonicalName());
+               } else {
+                       LOG.warn("Overwriting the '{}' is not recommended", 
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+               }
+       }
+
+       /**
+        * We use a separate thread for executing the 
KafkaConsumer.poll(timeout) call because Kafka is not
+        * handling interrupts properly. On an interrupt (which happens 
automatically by Flink if the task
+        * doesn't react to cancel() calls), the poll() method might never 
return.
+        * On cancel, we'll wakeup the .poll() call and wait for it to return
+        */
+       private static class ConsumerThread<T> extends Thread {
+               private final FlinkKafkaConsumer09<T> flinkKafkaConsumer;
+               private final SourceContext<T> sourceContext;
+               private boolean running = true;
+
+               public ConsumerThread(FlinkKafkaConsumer09<T> 
flinkKafkaConsumer, SourceContext<T> sourceContext) {
+                       this.flinkKafkaConsumer = flinkKafkaConsumer;
+                       this.sourceContext = sourceContext;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               long pollTimeout = 
Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, 
Long.toString(DEFAULT_POLL_TIMEOUT)));
+                               pollLoop: while (running) {
+                                       ConsumerRecords<byte[], byte[]> records;
+                                       //noinspection 
SynchronizeOnNonFinalField
+                                       synchronized 
(flinkKafkaConsumer.consumer) {
+                                               try {
+                                                       records = 
flinkKafkaConsumer.consumer.poll(pollTimeout);
+                                               } catch (WakeupException we) {
+                                                       if (running) {
+                                                               throw we;
+                                                       }
+                                                       // leave loop
+                                                       continue;
+                                               }
+                                       }
+                                       // get the records for each topic 
partition
+                                       for (int i = 0; i < 
flinkKafkaConsumer.subscribedPartitions.size(); i++) {
+                                               TopicPartition partition = 
flinkKafkaConsumer.subscribedPartitions.get(i);
+                                               KafkaTopicPartition 
flinkPartition = flinkKafkaConsumer.subscribedPartitionsAsFlink.get(i);
+                                               List<ConsumerRecord<byte[], 
byte[]>> partitionRecords = records.records(partition);
+                                               //noinspection 
ForLoopReplaceableByForEach
+                                               for (int j = 0; j < 
partitionRecords.size(); j++) {
+                                                       ConsumerRecord<byte[], 
byte[]> record = partitionRecords.get(j);
+                                                       T value = 
flinkKafkaConsumer.deserializer.deserialize(record.key(), record.value(), 
record.topic(), record.partition(),record.offset());
+                                                       
if(flinkKafkaConsumer.deserializer.isEndOfStream(value)) {
+                                                               // end of 
stream signaled
+                                                               running = false;
+                                                               break pollLoop;
+                                                       }
+                                                       synchronized 
(sourceContext.getCheckpointLock()) {
+                                                               
sourceContext.collect(value);
+                                                               
flinkKafkaConsumer.offsetsState.put(flinkPartition, record.offset());
+                                                       }
+                                               }
+                                       }
+                               }
+                       } catch(Throwable t) {
+                               if(running) {
+                                       
this.flinkKafkaConsumer.stopWithError(t);
+                               } else {
+                                       LOG.debug("Stopped ConsumerThread threw 
exception", t);
+                               }
+                       } finally {
+                               try {
+                                       flinkKafkaConsumer.consumer.close();
+                               } catch(Throwable t) {
+                                       LOG.warn("Error while closing 
consumer", t);
+                               }
+                       }
+               }
+
+               /**
+                * Try to shutdown the thread
+                */
+               public void shutdown() {
+                       this.running = false;
+                       this.flinkKafkaConsumer.consumer.wakeup();
+               }
+       }
+
+       private void stopWithError(Throwable t) {
+               this.consumerThreadException = t;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
new file mode 100644
index 0000000..6f7f687
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible 
with Kafka 0.8.
+ *
+ * Please note that this producer does not have any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
+
+       private static final long serialVersionUID = 1L;
+
+       // ------------------- Keyless serialization schema constructors 
----------------------
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param brokerList
+        *                      Comma separated addresses of the brokers
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined (keyless) serialization schema.
+        */
+       public FlinkKafkaProducer09(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined (keyless) serialization schema.
+        * @param producerConfig
+        *                      Properties with the producer configuration.
+        */
+       public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<IN>());
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+        */
+       public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner<IN> 
customPartitioner) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
+
+       }
+
+       // ------------------- Key/Value serialization schema constructors 
----------------------
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param brokerList
+        *                      Comma separated addresses of the brokers
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined serialization schema supporting 
key/value messages
+        */
+       public FlinkKafkaProducer09(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
+               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined serialization schema supporting 
key/value messages
+        * @param producerConfig
+        *                      Properties with the producer configuration.
+        */
+       public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
+               this(topicId, serializationSchema, producerConfig, new 
FixedPartitioner<IN>());
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        */
+       public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
+               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java
new file mode 100644
index 0000000..643da66
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.examples;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+
+/**
+ * Read Strings from Kafka and print them to standard out.
+ * Note: On a cluster, DataStream.print() will print to the TaskManager's .out 
file!
+ *
+ * Please pass the following arguments to run the example:
+ *     --topic test --bootstrap.servers localhost:9092 --group.id myconsumer
+ *
+ */
+public class ReadFromKafka {
+
+       public static void main(String[] args) throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().disableSysoutLogging();
+               env.setNumberOfExecutionRetries(4);
+               env.enableCheckpointing(5000);
+               env.setParallelism(2);
+
+               ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+               DataStream<String> messageStream = env
+                               .addSource(new FlinkKafkaConsumer09<>(
+                                               
parameterTool.getRequired("topic"),
+                                               new SimpleStringSchema(),
+                                               parameterTool.getProperties()));
+
+               messageStream.print();
+
+               env.execute("Read from Kafka example");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java
new file mode 100644
index 0000000..fbe53fa
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.examples;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+
+/**
+ * Generate a String every 500 ms and write it into a Kafka topic
+ *
+ * Please pass the following arguments to run the example:
+ *     --topic test --bootstrap.servers localhost:9092
+ *
+ */
+public class WriteIntoKafka {
+
+       public static void main(String[] args) throws Exception {
+               StreamExecutionEnvironment env =
+                               
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().disableSysoutLogging();
+               env.setNumberOfExecutionRetries(4);
+               env.setParallelism(2);
+
+               ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+               // very simple data generator
+               DataStream<String> messageStream = env.addSource(new 
SourceFunction<String>() {
+                       public boolean running = true;
+
+                       @Override
+                       public void run(SourceContext<String> ctx) throws 
Exception {
+                               long i = 0;
+                               while(this.running) {
+                                       ctx.collect("Element - " + i++);
+                                       Thread.sleep(500);
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+                               running = false;
+                       }
+               });
+
+               // write data into Kafka
+               messageStream.addSink(new 
FlinkKafkaProducer09<>(parameterTool.getRequired("topic"), new 
SimpleStringSchema(), parameterTool.getProperties()));
+
+               env.execute("Write into Kafka example");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
new file mode 100644
index 0000000..55abaaa
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Test;
+
+
+public class Kafka09ITCase extends KafkaConsumerTestBase {
+
+       // 
------------------------------------------------------------------------
+       //  Suite of Tests
+       // 
------------------------------------------------------------------------
+
+       @Test(timeout = 60000)
+       public void testCheckpointing() throws Exception {
+               runCheckpointingTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testFailOnNoBroker() throws Exception {
+               runFailOnNoBrokerTest();
+       }
+
+
+       @Test(timeout = 60000)
+       public void testConcurrentProducerConsumerTopology() throws Exception {
+               runSimpleConcurrentProducerConsumerTopology();
+       }
+
+       @Test(timeout = 60000)
+       public void testKeyValueSupport() throws Exception {
+               runKeyValueTest();
+       }
+
+       // --- canceling / failures ---
+
+       @Test(timeout = 60000)
+       public void testCancelingEmptyTopic() throws Exception {
+               runCancelingOnEmptyInputTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testCancelingFullTopic() throws Exception {
+               runCancelingOnFullInputTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testFailOnDeploy() throws Exception {
+               runFailOnDeployTest();
+       }
+
+
+       // --- source to partition mappings and exactly once ---
+
+       @Test(timeout = 60000)
+       public void testOneToOneSources() throws Exception {
+               runOneToOneExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testOneSourceMultiplePartitions() throws Exception {
+               runOneSourceMultiplePartitionsExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleSourcesOnePartition() throws Exception {
+               runMultipleSourcesOnePartitionExactlyOnceTest();
+       }
+
+       // --- broker failure ---
+
+       @Test(timeout = 60000)
+       public void testBrokerFailure() throws Exception {
+               runBrokerFailureTest();
+       }
+
+       // --- special executions ---
+
+       @Test(timeout = 60000)
+       public void testBigRecordJob() throws Exception {
+               runBigRecordTestTopology();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleTopics() throws Exception {
+               runConsumeMultipleTopics();
+       }
+
+       @Test(timeout = 60000)
+       public void testAllDeletes() throws Exception {
+               runAllDeletesTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testMetricsAndEndOfStream() throws Exception {
+               runMetricsAndEndOfStreamTest();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
new file mode 100644
index 0000000..1288347
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import org.junit.Test;
+
+
+@SuppressWarnings("serial")
+public class Kafka09ProducerITCase extends KafkaProducerTestBase {
+
+       @Test
+       public void testCustomPartitioning() {
+               runCustomPartitioningTest();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..a2c4f73
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.TestLogger;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaProducerBase.class)
+public class KafkaProducerTest extends TestLogger {
+       
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testPropagateExceptions() {
+               try {
+                       // mock kafka producer
+                       KafkaProducer<?, ?> kafkaProducerMock = 
mock(KafkaProducer.class);
+                       
+                       // partition setup
+                       
when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
+                                       Arrays.asList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
+
+                       // failure when trying to send an element
+                       when(kafkaProducerMock.send(any(ProducerRecord.class), 
any(Callback.class)))
+                               .thenAnswer(new 
Answer<Future<RecordMetadata>>() {
+                                       @Override
+                                       public Future<RecordMetadata> 
answer(InvocationOnMock invocation) throws Throwable {
+                                               Callback callback = (Callback) 
invocation.getArguments()[1];
+                                               callback.onCompletion(null, new 
Exception("Test error"));
+                                               return null;
+                                       }
+                               });
+                       
+                       // make sure the FlinkKafkaProducer instantiates our 
mock producer
+                       
whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
+                       
+                       // (1) producer that propagates errors
+
+                       FlinkKafkaProducer09<String> producerPropagating = new 
FlinkKafkaProducer09<>(
+                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
+
+                       producerPropagating.setRuntimeContext(new 
MockRuntimeContext(17, 3));
+                       producerPropagating.open(new Configuration());
+                       
+                       try {
+                               producerPropagating.invoke("value");
+                               producerPropagating.invoke("value");
+                               fail("This should fail with an exception");
+                       }
+                       catch (Exception e) {
+                               assertNotNull(e.getCause());
+                               assertNotNull(e.getCause().getMessage());
+                               
assertTrue(e.getCause().getMessage().contains("Test error"));
+                       }
+
+                       // (2) producer that only logs errors
+
+                       FlinkKafkaProducer09<String> producerLogging = new 
FlinkKafkaProducer09<>(
+                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
+                       producerLogging.setLogFailuresOnly(true);
+                       
+                       producerLogging.setRuntimeContext(new 
MockRuntimeContext(17, 3));
+                       producerLogging.open(new Configuration());
+
+                       producerLogging.invoke("value");
+                       producerLogging.invoke("value");
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 0000000..0855ba6
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.common.KafkaException;
+import kafka.consumer.ConsumerConfig;
+import kafka.api.PartitionMetadata;
+import kafka.network.SocketServer;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.SystemTime$;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import 
org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.Seq;
+
+import java.io.File;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 0.9
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+       private File tmpZkDir;
+       private File tmpKafkaParent;
+       private List<File> tmpKafkaDirs;
+       private List<KafkaServer> brokers;
+       private TestingServer zookeeper;
+       private String zookeeperConnectionString;
+       private String brokerConnectionString = "";
+       private Properties standardProps;
+       private ConsumerConfig standardCC;
+
+
+       public String getBrokerConnectionString() {
+               return brokerConnectionString;
+       }
+
+       @Override
+       public ConsumerConfig getStandardConsumerConfig() {
+               return standardCC;
+       }
+
+       @Override
+       public Properties getStandardProperties() {
+               return standardProps;
+       }
+
+       @Override
+       public String getVersion() {
+               return "0.9";
+       }
+
+       @Override
+       public List<KafkaServer> getBrokers() {
+               return brokers;
+       }
+
+       @Override
+       public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, 
KeyedDeserializationSchema<T> readSchema, Properties props) {
+               return new FlinkKafkaConsumer09<>(topics, readSchema, props);
+       }
+
+       @Override
+       public <T> FlinkKafkaProducerBase<T> getProducer(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> 
partitioner) {
+               return new FlinkKafkaProducer09<>(topic, serSchema, props, 
partitioner);
+       }
+
+       @Override
+       public void restartBroker(int leaderId) throws Exception {
+               brokers.set(leaderId, getKafkaServer(leaderId, 
tmpKafkaDirs.get(leaderId), KAFKA_HOST, zookeeperConnectionString));
+       }
+
+       @Override
+       public int getLeaderToShutDown(String topic) throws Exception {
+               ZkUtils zkUtils = getZkUtils();
+               try {
+                       PartitionMetadata firstPart = null;
+                       do {
+                               if (firstPart != null) {
+                                       LOG.info("Unable to find leader. error 
code {}", firstPart.errorCode());
+                                       // not the first try. Sleep a bit
+                                       Thread.sleep(150);
+                               }
+
+                               Seq<PartitionMetadata> partitionMetadata = 
AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata();
+                               firstPart = partitionMetadata.head();
+                       }
+                       while (firstPart.errorCode() != 0);
+
+                       return firstPart.leader().get().id();
+               } finally {
+                       zkUtils.close();
+               }
+       }
+
+       @Override
+       public int getBrokerId(KafkaServer server) {
+               return server.config().brokerId();
+       }
+
+       @Override
+       public void prepare(int numKafkaServers) {
+               File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+               tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
+               assertTrue("cannot create zookeeper temp dir", 
tmpZkDir.mkdirs());
+
+               tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + 
(UUID.randomUUID().toString()));
+               assertTrue("cannot create kafka temp dir", 
tmpKafkaParent.mkdirs());
+
+               tmpKafkaDirs = new ArrayList<>(numKafkaServers);
+               for (int i = 0; i < numKafkaServers; i++) {
+                       File tmpDir = new File(tmpKafkaParent, "server-" + i);
+                       assertTrue("cannot create kafka temp dir", 
tmpDir.mkdir());
+                       tmpKafkaDirs.add(tmpDir);
+               }
+
+               zookeeper = null;
+               brokers = null;
+
+               try {
+                       LOG.info("Starting Zookeeper");
+                       zookeeper = new TestingServer(-1, tmpZkDir);
+                       zookeeperConnectionString = 
zookeeper.getConnectString();
+
+                       LOG.info("Starting KafkaServer");
+                       brokers = new ArrayList<>(numKafkaServers);
+
+                       for (int i = 0; i < numKafkaServers; i++) {
+                               brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i), KafkaTestEnvironment.KAFKA_HOST, 
zookeeperConnectionString));
+
+                               SocketServer socketServer = 
brokers.get(i).socketServer();
+                               brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+                       }
+
+                       LOG.info("ZK and KafkaServer started.");
+               }
+               catch (Throwable t) {
+                       t.printStackTrace();
+                       fail("Test setup failed: " + t.getMessage());
+               }
+
+               standardProps = new Properties();
+               standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
+               standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
+               standardProps.setProperty("group.id", "flink-tests");
+               standardProps.setProperty("auto.commit.enable", "false");
+               standardProps.setProperty("zookeeper.session.timeout.ms", 
"12000"); // 6 seconds is default. Seems to be too small for travis.
+               standardProps.setProperty("zookeeper.connection.timeout.ms", 
"20000");
+               standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning.
+               standardProps.setProperty("fetch.message.max.bytes", "256"); // 
make a lot of fetches (MESSAGES MUST BE SMALLER!)
+
+               Properties consumerConfigProps = new Properties();
+               consumerConfigProps.putAll(standardProps);
+               consumerConfigProps.setProperty("auto.offset.reset", 
"smallest");
+               standardCC = new ConsumerConfig(consumerConfigProps);
+       }
+
+       @Override
+       public void shutdown() {
+               for (KafkaServer broker : brokers) {
+                       if (broker != null) {
+                               broker.shutdown();
+                       }
+               }
+               brokers.clear();
+
+               if (zookeeper != null) {
+                       try {
+                               zookeeper.stop();
+                       }
+                       catch (Exception e) {
+                               LOG.warn("ZK.stop() failed", e);
+                       }
+                       zookeeper = null;
+               }
+
+               // clean up the temp spaces
+
+               if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+                       try {
+                               FileUtils.deleteDirectory(tmpKafkaParent);
+                       }
+                       catch (Exception e) {
+                               // ignore
+                       }
+               }
+               if (tmpZkDir != null && tmpZkDir.exists()) {
+                       try {
+                               FileUtils.deleteDirectory(tmpZkDir);
+                       }
+                       catch (Exception e) {
+                               // ignore
+                       }
+               }
+       }
+
+       public ZkUtils getZkUtils() {
+               ZkClient creator = new ZkClient(standardCC.zkConnect(), 
standardCC.zkSessionTimeoutMs(),
+                               standardCC.zkConnectionTimeoutMs(), new 
ZooKeeperStringSerializer());
+               return ZkUtils.apply(creator, false);
+       }
+
+       @Override
+       public void createTestTopic(String topic, int numberOfPartitions, int 
replicationFactor) {
+               // create topic with one client
+               Properties topicConfig = new Properties();
+               LOG.info("Creating topic {}", topic);
+
+               ZkUtils zkUtils = getZkUtils();
+               try {
+                       AdminUtils.createTopic(zkUtils, topic, 
numberOfPartitions, replicationFactor, topicConfig);
+               } finally {
+                       zkUtils.close();
+               }
+
+               // validate that the topic has been created
+               final long deadline = System.currentTimeMillis() + 30000;
+               do {
+                       try {
+                               Thread.sleep(100);
+                       } catch (InterruptedException e) {
+                               // restore interrupted state
+                       }
+                       // we could use AdminUtils.topicExists(zkUtils, topic) 
here, but it's results are
+                       // not always correct.
+
+                       // create a new ZK utils connection
+                       ZkUtils checkZKConn = getZkUtils();
+                       if(AdminUtils.topicExists(checkZKConn, topic)) {
+                               checkZKConn.close();
+                               return;
+                       }
+                       checkZKConn.close();
+               }
+               while (System.currentTimeMillis() < deadline);
+               fail("Test topic could not be created");
+       }
+
+       @Override
+       public void deleteTestTopic(String topic) {
+               ZkUtils zkUtils = getZkUtils();
+               try {
+                       LOG.info("Deleting topic {}", topic);
+
+                       ZkClient zk = new ZkClient(standardCC.zkConnect(), 
standardCC.zkSessionTimeoutMs(),
+                                       standardCC.zkConnectionTimeoutMs(), new 
ZooKeeperStringSerializer());
+
+                       AdminUtils.deleteTopic(zkUtils, topic);
+
+                       zk.close();
+               } finally {
+                       zkUtils.close();
+               }
+       }
+
+       /**
+        * Copied from 
com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+        */
+       protected static KafkaServer getKafkaServer(int brokerId, File 
tmpFolder,
+                                                                               
                String kafkaHost,
+                                                                               
                String zookeeperConnectionString) throws Exception {
+               Properties kafkaProperties = new Properties();
+
+               // properties have to be Strings
+               kafkaProperties.put("advertised.host.name", kafkaHost);
+               kafkaProperties.put("broker.id", Integer.toString(brokerId));
+               kafkaProperties.put("log.dir", tmpFolder.toString());
+               kafkaProperties.put("zookeeper.connect", 
zookeeperConnectionString);
+               kafkaProperties.put("message.max.bytes", String.valueOf(50 * 
1024 * 1024));
+               kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
+
+               // for CI stability, increase zookeeper session timeout
+               kafkaProperties.put("zookeeper.session.timeout.ms", "20000");
+
+               final int numTries = 5;
+
+               for (int i = 1; i <= numTries; i++) {
+                       int kafkaPort = NetUtils.getAvailablePort();
+                       kafkaProperties.put("port", 
Integer.toString(kafkaPort));
+                       KafkaConfig kafkaConfig = new 
KafkaConfig(kafkaProperties);
+
+                       try {
+                               scala.Option<String> stringNone = 
scala.Option.apply(null);
+                               KafkaServer server = new 
KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+                               server.startup();
+                               return server;
+                       }
+                       catch (KafkaException e) {
+                               if (e.getCause() instanceof BindException) {
+                                       // port conflict, retry...
+                                       LOG.info("Port conflict when starting 
Kafka Broker. Retrying...");
+                               }
+                               else {
+                                       throw e;
+                               }
+                       }
+               }
+
+               throw new Exception("Could not start Kafka after " + numTries + 
" retries due to port conflicts.");
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
new file mode 100644
index 0000000..354d353
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
@@ -0,0 +1,169 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-streaming-connectors-parent</artifactId>
+               <version>1.0-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-kafka-base</artifactId>
+       <name>flink-connector-kafka-base</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <kafka.version>0.8.2.0</kafka.version>
+       </properties>
+
+       <dependencies>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka_${scala.binary.version}</artifactId>
+                       <version>${kafka.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>com.sun.jmx</groupId>
+                                       <artifactId>jmxri</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.sun.jdmk</groupId>
+                                       <artifactId>jmxtools</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>log4j</groupId>
+                                       <artifactId>log4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-simple</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>net.sf.jopt-simple</groupId>
+                                       <artifactId>jopt-simple</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.scala-lang</groupId>
+                                       <artifactId>scala-reflect</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.scala-lang</groupId>
+                                       <artifactId>scala-compiler</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.yammer.metrics</groupId>
+                                       
<artifactId>metrics-annotation</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.xerial.snappy</groupId>
+                                       <artifactId>snappy-java</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- force using the latest zkclient -->
+               <dependency>
+                       <groupId>com.101tec</groupId>
+                       <artifactId>zkclient</artifactId>
+                       <version>0.7</version>
+                       <type>jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.curator</groupId>
+                       <artifactId>curator-test</artifactId>
+                       <version>${curator.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-tests</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+       <dependencyManagement>
+               <dependencies>
+                       <dependency>
+                               <groupId>com.101tec</groupId>
+                               <artifactId>zkclient</artifactId>
+                               <version>0.7</version>
+                       </dependency>
+               </dependencies>
+       </dependencyManagement>
+       
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-failsafe-plugin</artifactId>
+                               <configuration>
+                                       <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
+                                       <forkCount>1</forkCount>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+       
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
new file mode 100644
index 0000000..3c36586
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFunction<T>
+               implements CheckpointNotifier, 
CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, 
ResultTypeQueryable<T> {
+
+       // 
------------------------------------------------------------------------
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+
+       private static final long serialVersionUID = -6272159445203409112L;
+
+       /** The maximum number of pending non-committed checkpoints to track, 
to avoid memory leaks */
+       public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+
+       /** The schema to convert between Kafka#s byte messages, and Flink's 
objects */
+       protected final KeyedDeserializationSchema<T> deserializer;
+
+       // ------  Runtime State  -------
+
+       /** Data for pending but uncommitted checkpoints */
+       protected final LinkedMap pendingCheckpoints = new LinkedMap();
+
+       /** The offsets of the last returned elements */
+       protected transient HashMap<KafkaTopicPartition, Long> offsetsState;
+
+       /** The offsets to restore to, if the consumer restores state from a 
checkpoint */
+       protected transient HashMap<KafkaTopicPartition, Long> restoreToOffset;
+
+       /** Flag indicating whether the consumer is still running **/
+       protected volatile boolean running = true;
+
+       // 
------------------------------------------------------------------------
+
+
+       /**
+        * Creates a new Flink Kafka Consumer, using the given type of fetcher 
and offset handler.
+        *
+        * <p>To determine which kink of fetcher and offset handler to use, 
please refer to the docs
+        * at the beginnign of this class.</p>
+        *
+        * @param deserializer
+        *           The deserializer to turn raw byte messages into Java/Scala 
objects.
+        * @param props
+        *           The properties that are used to configure both the fetcher 
and the offset handler.
+        */
+       public FlinkKafkaConsumerBase(KeyedDeserializationSchema<T> 
deserializer, Properties props) {
+               this.deserializer = checkNotNull(deserializer, 
"valueDeserializer");
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Checkpoint and restore
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public HashMap<KafkaTopicPartition, Long> snapshotState(long 
checkpointId, long checkpointTimestamp) throws Exception {
+               if (offsetsState == null) {
+                       LOG.debug("snapshotState() requested on not yet opened 
source; returning null.");
+                       return null;
+               }
+               if (!running) {
+                       LOG.debug("snapshotState() called on closed source");
+                       return null;
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Snapshotting state. Offsets: {}, checkpoint 
id: {}, timestamp: {}",
+                                       
KafkaTopicPartition.toString(offsetsState), checkpointId, checkpointTimestamp);
+               }
+
+               // the use of clone() is okay here is okay, we just need a new 
map, the keys are not changed
+               //noinspection unchecked
+               HashMap<KafkaTopicPartition, Long> currentOffsets = 
(HashMap<KafkaTopicPartition, Long>) offsetsState.clone();
+
+               // the map cannot be asynchronously updated, because only one 
checkpoint call can happen
+               // on this function at a time: either snapshotState() or 
notifyCheckpointComplete()
+               pendingCheckpoints.put(checkpointId, currentOffsets);
+                       
+               while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) 
{
+                       pendingCheckpoints.remove(0);
+               }
+
+               return currentOffsets;
+       }
+
+       @Override
+       public void restoreState(HashMap<KafkaTopicPartition, Long> 
restoredOffsets) {
+               LOG.info("Setting restore state in Kafka");
+               restoreToOffset = restoredOffsets;
+       }
+
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               if (offsetsState == null) {
+                       LOG.debug("notifyCheckpointComplete() called on 
uninitialized source");
+                       return;
+               }
+               if (!running) {
+                       LOG.debug("notifyCheckpointComplete() called on closed 
source");
+                       return;
+               }
+               
+               // only one commit operation must be in progress
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Committing offsets externally for checkpoint 
{}", checkpointId);
+               }
+
+               try {
+                       HashMap<KafkaTopicPartition, Long> checkpointOffsets;
+       
+                       // the map may be asynchronously updates when 
snapshotting state, so we synchronize
+                       synchronized (pendingCheckpoints) {
+                               final int posInMap = 
pendingCheckpoints.indexOf(checkpointId);
+                               if (posInMap == -1) {
+                                       LOG.warn("Received confirmation for 
unknown checkpoint id {}", checkpointId);
+                                       return;
+                               }
+
+                               //noinspection unchecked
+                               checkpointOffsets = 
(HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
+
+                               
+                               // remove older checkpoints in map
+                               for (int i = 0; i < posInMap; i++) {
+                                       pendingCheckpoints.remove(0);
+                               }
+                       }
+                       if (checkpointOffsets == null || 
checkpointOffsets.size() == 0) {
+                               LOG.debug("Checkpoint state was empty.");
+                               return;
+                       }
+                       commitOffsets(checkpointOffsets);
+               }
+               catch (Exception e) {
+                       if (running) {
+                               throw e;
+                       }
+                       // else ignore exception if we are no longer running
+               }
+       }
+
+       protected abstract void commitOffsets(HashMap<KafkaTopicPartition, 
Long> checkpointOffsets) throws Exception;
+
+
+       @Override
+       public TypeInformation<T> getProducedType() {
+               return deserializer.getProducedType();
+       }
+
+       protected static <T> List<T> assignPartitions(List<T> partitions, int 
numConsumers, int consumerIndex) {
+               checkArgument(numConsumers > 0);
+               checkArgument(consumerIndex < numConsumers);
+
+               List<T> partitionsToSub = new ArrayList<>();
+
+               for (int i = 0; i < partitions.size(); i++) {
+                       if (i % numConsumers == consumerIndex) {
+                               partitionsToSub.add(partitions.get(i));
+                       }
+               }
+               return partitionsToSub;
+       }
+
+       /**
+        * Method to log partition information.
+        * @param partitionInfos List of subscribed partitions
+        */
+       public static void logPartitionInfo(List<KafkaTopicPartition> 
partitionInfos) {
+               Map<String, Integer> countPerTopic = new HashMap<>();
+               for (KafkaTopicPartition partition : partitionInfos) {
+                       Integer count = countPerTopic.get(partition.getTopic());
+                       if (count == null) {
+                               count = 1;
+                       } else {
+                               count++;
+                       }
+                       countPerTopic.put(partition.getTopic(), count);
+               }
+               StringBuilder sb = new StringBuilder();
+               for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) {
+                       sb.append(e.getKey()).append(" 
(").append(e.getValue()).append("), ");
+               }
+               LOG.info("Consumer is going to read the following topics (with 
number of partitions): ", sb.toString());
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
new file mode 100644
index 0000000..ebc02c9
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic.
+ *
+ * Please note that this producer does not have any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>  
{
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Configuration key for disabling the metrics reporting
+        */
+       public static final String KEY_DISABLE_METRICS = 
"flink.disable-metrics";
+
+       /**
+        * Array with the partition ids of the given topicId
+        * The size of this array is the number of partitions
+        */
+       protected final int[] partitions;
+
+       /**
+        * User defined properties for the Producer
+        */
+       protected final Properties producerConfig;
+
+       /**
+        * The name of the topic this producer is writing data to
+        */
+       protected final String topicId;
+
+       /**
+        * (Serializable) SerializationSchema for turning objects used with 
Flink into
+        * byte[] for Kafka.
+        */
+       protected final KeyedSerializationSchema<IN> schema;
+
+       /**
+        * User-provided partitioner for assigning an object to a Kafka 
partition.
+        */
+       protected final KafkaPartitioner<IN> partitioner;
+
+       /**
+        * Flag indicating whether to accept failures (and log them), or to 
fail on failures
+        */
+       protected boolean logFailuresOnly;
+       
+       // -------------------------------- Runtime fields 
------------------------------------------
+
+       /** KafkaProducer instance */
+       protected transient KafkaProducer<byte[], byte[]> producer;
+
+       /** The callback than handles error propagation or logging callbacks */
+       protected transient Callback callback;
+       
+       /** Errors encountered in the async producer are stored here */
+       protected transient volatile Exception asyncException;
+
+
+       /**
+        * The main constructor for creating a FlinkKafkaProducer.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner
+        */
+       public FlinkKafkaProducerBase(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
+               Preconditions.checkNotNull(topicId, "TopicID not set");
+               Preconditions.checkNotNull(serializationSchema, 
"serializationSchema not set");
+               Preconditions.checkNotNull(producerConfig, "producerConfig not 
set");
+               ClosureCleaner.ensureSerializable(customPartitioner);
+               ClosureCleaner.ensureSerializable(serializationSchema);
+
+               this.topicId = topicId;
+               this.schema = serializationSchema;
+               this.producerConfig = producerConfig;
+
+               // set the producer configuration properties.
+
+               if 
(!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+                       
this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
+               } else {
+                       LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+               }
+
+               if 
(!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+                       
this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
+               } else {
+                       LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+               }
+
+
+               // create a local KafkaProducer to get the list of partitions.
+               // this will also ensure locally that all required 
ProducerConfig values are set.
+               try (KafkaProducer<Void, IN> getPartitionsProd = new 
KafkaProducer<>(this.producerConfig)) {
+                       List<PartitionInfo> partitionsList = 
getPartitionsProd.partitionsFor(topicId);
+
+                       this.partitions = new int[partitionsList.size()];
+                       for (int i = 0; i < partitions.length; i++) {
+                               partitions[i] = 
partitionsList.get(i).partition();
+                       }
+                       getPartitionsProd.close();
+               }
+
+               this.partitioner = customPartitioner;
+       }
+
+       // ---------------------------------- Properties 
--------------------------
+
+       /**
+        * Defines whether the producer should fail on errors, or only log them.
+        * If this is set to true, then exceptions will be only logged, if set 
to false,
+        * exceptions will be eventually thrown and cause the streaming program 
to 
+        * fail (and enter recovery).
+        * 
+        * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+        */
+       public void setLogFailuresOnly(boolean logFailuresOnly) {
+               this.logFailuresOnly = logFailuresOnly;
+       }
+
+       // ----------------------------------- Utilities 
--------------------------
+       
+       /**
+        * Initializes the connection to Kafka.
+        */
+       @Override
+       public void open(Configuration configuration) {
+               producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
+
+               RuntimeContext ctx = getRuntimeContext();
+               if(partitioner != null) {
+                       partitioner.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks(), partitions);
+               }
+
+               LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into 
topic {}", 
+                               ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks(), topicId);
+
+               // register Kafka metrics to Flink accumulators
+               
if(!Boolean.getBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, 
"false"))) {
+                       Map<MetricName, ? extends Metric> metrics = 
this.producer.metrics();
+
+                       if(metrics == null) {
+                               // MapR's Kafka implementation returns null 
here.
+                               LOG.info("Producer implementation does not 
support metrics");
+                       } else {
+                               for(Map.Entry<MetricName, ? extends Metric> 
metric: metrics.entrySet()) {
+                                       String name = "producer-" + 
metric.getKey().name();
+                                       DefaultKafkaMetricAccumulator 
kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue());
+                                       // best effort: we only add the 
accumulator if available.
+                                       if(kafkaAccumulator != null) {
+                                               
getRuntimeContext().addAccumulator(name, kafkaAccumulator);
+                                       }
+                               }
+                       }
+               }
+
+               if (logFailuresOnly) {
+                       callback = new Callback() {
+                               
+                               @Override
+                               public void onCompletion(RecordMetadata 
metadata, Exception e) {
+                                       if (e != null) {
+                                               LOG.error("Error while sending 
record to Kafka: " + e.getMessage(), e);
+                                       }
+                               }
+                       };
+               }
+               else {
+                       callback = new Callback() {
+                               @Override
+                               public void onCompletion(RecordMetadata 
metadata, Exception exception) {
+                                       if (exception != null && asyncException 
== null) {
+                                               asyncException = exception;
+                                       }
+                               }
+                       };
+               }
+       }
+
+       /**
+        * Called when new data arrives to the sink, and forwards it to Kafka.
+        *
+        * @param next
+        *              The incoming data
+        */
+       @Override
+       public void invoke(IN next) throws Exception {
+               // propagate asynchronous errors
+               checkErroneous();
+
+               byte[] serializedKey = schema.serializeKey(next);
+               byte[] serializedValue = schema.serializeValue(next);
+               ProducerRecord<byte[], byte[]> record;
+               if(partitioner == null) {
+                       record = new ProducerRecord<>(topicId, serializedKey, 
serializedValue);
+               } else {
+                       record = new ProducerRecord<>(topicId, 
partitioner.partition(next, serializedKey, serializedValue, partitions.length), 
serializedKey, serializedValue);
+               }
+
+               producer.send(record, callback);
+       }
+
+
+       @Override
+       public void close() throws Exception {
+               if (producer != null) {
+                       producer.close();
+               }
+               
+               // make sure we propagate pending errors
+               checkErroneous();
+       }
+
+
+       // ----------------------------------- Utilities 
--------------------------
+
+       protected void checkErroneous() throws Exception {
+               Exception e = asyncException;
+               if (e != null) {
+                       // prevent double throwing
+                       asyncException = null;
+                       throw new Exception("Failed to send data to Kafka: " + 
e.getMessage(), e);
+               }
+       }
+       
+       public static Properties getPropertiesFromBrokerList(String brokerList) 
{
+               String[] elements = brokerList.split(",");
+               
+               // validate the broker addresses
+               for (String broker: elements) {
+                       NetUtils.getCorrectHostnamePort(broker);
+               }
+               
+               Properties props = new Properties();
+               props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList);
+               return props;
+       }
+}

Reply via email to