http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
deleted file mode 100644
index d495327..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.SerializedValue;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer 
API.
- * 
- * @param <T> The type of elements produced by the fetcher.
- */
-public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(Kafka09Fetcher.class);
-
-       // 
------------------------------------------------------------------------
-
-       /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
-       private final KeyedDeserializationSchema<T> deserializer;
-
-       /** The handover of data and exceptions between the consumer thread and 
the task thread */
-       private final Handover handover;
-
-       /** The thread that runs the actual KafkaConsumer and hand the record 
batches to this fetcher */
-       private final KafkaConsumerThread consumerThread;
-
-       /** Flag to mark the main work loop as alive */
-       private volatile boolean running = true;
-
-       // 
------------------------------------------------------------------------
-
-       public Kafka09Fetcher(
-                       SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> assignedPartitions,
-                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
-                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
-                       ProcessingTimeService processingTimeProvider,
-                       long autoWatermarkInterval,
-                       ClassLoader userCodeClassLoader,
-                       boolean enableCheckpointing,
-                       String taskNameWithSubtasks,
-                       MetricGroup metricGroup,
-                       KeyedDeserializationSchema<T> deserializer,
-                       Properties kafkaProperties,
-                       long pollTimeout,
-                       boolean useMetrics) throws Exception
-       {
-               super(
-                               sourceContext,
-                               assignedPartitions,
-                               watermarksPeriodic,
-                               watermarksPunctuated,
-                               processingTimeProvider,
-                               autoWatermarkInterval,
-                               userCodeClassLoader,
-                               useMetrics);
-
-               this.deserializer = deserializer;
-               this.handover = new Handover();
-
-               final MetricGroup kafkaMetricGroup = 
metricGroup.addGroup("KafkaConsumer");
-               addOffsetStateGauge(kafkaMetricGroup);
-
-               // if checkpointing is enabled, we are not automatically 
committing to Kafka.
-               kafkaProperties.setProperty(
-                               ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-                               Boolean.toString(!enableCheckpointing));
-               
-               this.consumerThread = new KafkaConsumerThread(
-                               LOG,
-                               handover,
-                               kafkaProperties,
-                               subscribedPartitions(),
-                               kafkaMetricGroup,
-                               createCallBridge(),
-                               getFetcherName() + " for " + 
taskNameWithSubtasks,
-                               pollTimeout,
-                               useMetrics);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Fetcher work methods
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void runFetchLoop() throws Exception {
-               try {
-                       final Handover handover = this.handover;
-
-                       // kick off the actual Kafka consumer
-                       consumerThread.start();
-
-                       while (running) {
-                               // this blocks until we get the next records
-                               // it automatically re-throws exceptions 
encountered in the fetcher thread
-                               final ConsumerRecords<byte[], byte[]> records = 
handover.pollNext();
-
-                               // get the records for each topic partition
-                               for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitions()) {
-
-                                       List<ConsumerRecord<byte[], byte[]>> 
partitionRecords =
-                                                       
records.records(partition.getKafkaPartitionHandle());
-
-                                       for (ConsumerRecord<byte[], byte[]> 
record : partitionRecords) {
-
-                                               final T value = 
deserializer.deserialize(
-                                                               record.key(), 
record.value(),
-                                                               record.topic(), 
record.partition(), record.offset());
-
-                                               if 
(deserializer.isEndOfStream(value)) {
-                                                       // end of stream 
signaled
-                                                       running = false;
-                                                       break;
-                                               }
-
-                                               // emit the actual record. this 
also updates offset state atomically
-                                               // and deals with timestamps 
and watermark generation
-                                               emitRecord(value, partition, 
record.offset(), record);
-                                       }
-                               }
-                       }
-               }
-               finally {
-                       // this signals the consumer thread that no more work 
is to be done
-                       consumerThread.shutdown();
-               }
-
-               // on a clean exit, wait for the runner thread
-               try {
-                       consumerThread.join();
-               }
-               catch (InterruptedException e) {
-                       // may be the result of a wake-up interruption after an 
exception.
-                       // we ignore this here and only restore the 
interruption state
-                       Thread.currentThread().interrupt();
-               }
-       }
-
-       @Override
-       public void cancel() {
-               // flag the main thread to exit. A thread interrupt will come 
anyways.
-               running = false;
-               handover.close();
-               consumerThread.shutdown();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  The below methods are overridden in the 0.10 fetcher, which 
otherwise
-       //   reuses most of the 0.9 fetcher behavior
-       // 
------------------------------------------------------------------------
-
-       protected void emitRecord(
-                       T record,
-                       KafkaTopicPartitionState<TopicPartition> partition,
-                       long offset,
-                       @SuppressWarnings("UnusedParameters") ConsumerRecord<?, 
?> consumerRecord) throws Exception {
-
-               // the 0.9 Fetcher does not try to extract a timestamp
-               emitRecord(record, partition, offset);
-       }
-
-       /**
-        * Gets the name of this fetcher, for thread naming and logging 
purposes.
-        */
-       protected String getFetcherName() {
-               return "Kafka 0.9 Fetcher";
-       }
-
-       protected KafkaConsumerCallBridge createCallBridge() {
-               return new KafkaConsumerCallBridge();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Implement Methods of the AbstractFetcher
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition 
partition) {
-               return new TopicPartition(partition.getTopic(), 
partition.getPartition());
-       }
-
-       @Override
-       public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> 
offsets) throws Exception {
-               KafkaTopicPartitionState<TopicPartition>[] partitions = 
subscribedPartitions();
-               Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>(partitions.length);
-
-               for (KafkaTopicPartitionState<TopicPartition> partition : 
partitions) {
-                       Long lastProcessedOffset = 
offsets.get(partition.getKafkaTopicPartition());
-                       if (lastProcessedOffset != null) {
-                               // committed offsets through the KafkaConsumer 
need to be 1 more than the last processed offset.
-                               // This does not affect Flink's 
checkpoints/saved state.
-                               long offsetToCommit = lastProcessedOffset + 1;
-
-                               
offsetsToCommit.put(partition.getKafkaPartitionHandle(), new 
OffsetAndMetadata(offsetToCommit));
-                               partition.setCommittedOffset(offsetToCommit);
-                       }
-               }
-
-               // record the work to be committed by the main consumer thread 
and make sure the consumer notices that
-               consumerThread.setOffsetsToCommit(offsetsToCommit);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
deleted file mode 100644
index c17aae6..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.List;
-
-/**
- * The ConsumerCallBridge simply calls methods on the {@link KafkaConsumer}.
- * 
- * This indirection is necessary, because Kafka broke binary compatibility 
between 0.9 and 0.10,
- * for example changing {@code assign(List)} to {@code assign(Collection)}.
- * 
- * Because of that, we need to two versions whose compiled code goes against 
different method signatures.
- * Even though the source of subclasses may look identical, the byte code will 
be different, because they
- * are compiled against different dependencies.
- */
-public class KafkaConsumerCallBridge {
-
-       public void assignPartitions(KafkaConsumer<?, ?> consumer, 
List<TopicPartition> topicPartitions) throws Exception {
-               consumer.assign(topicPartitions);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
deleted file mode 100644
index 9cfa840..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.flink.metrics.MetricGroup;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The thread the runs the {@link KafkaConsumer}, connecting to the brokers 
and polling records.
- * The thread pushes the data into a {@link Handover} to be picked up by the 
fetcher that will
- * deserialize and emit the records.
- * 
- * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to 
shut it down.
- * The Kafka consumer code was found to not always handle interrupts well, and 
to even
- * deadlock in certain situations.
- * 
- * <p>Implementation Note: This code is written to be reusable in later 
versions of the KafkaConsumer.
- * Because Kafka is not maintaining binary compatibility, we use a "call 
bridge" as an indirection
- * to the KafkaConsumer calls that change signature.
- */
-public class KafkaConsumerThread extends Thread {
-
-       /** Logger for this consumer */
-       private final Logger log;
-
-       /** The handover of data and exceptions between the consumer thread and 
the task thread */
-       private final Handover handover;
-
-       /** The next offsets that the main thread should commit */
-       private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> 
nextOffsetsToCommit;
-
-       /** The configuration for the Kafka consumer */
-       private final Properties kafkaProperties;
-
-       /** The partitions that this consumer reads from */ 
-       private final KafkaTopicPartitionState<TopicPartition>[] 
subscribedPartitions;
-
-       /** We get this from the outside to publish metrics. **/
-       private final MetricGroup kafkaMetricGroup;
-
-       /** The indirections on KafkaConsumer methods, for cases where 
KafkaConsumer compatibility is broken */
-       private final KafkaConsumerCallBridge consumerCallBridge;
-
-       /** The maximum number of milliseconds to wait for a fetch batch */
-       private final long pollTimeout;
-
-       /** Flag whether to add Kafka's metrics to the Flink metrics */
-       private final boolean useMetrics;
-
-       /** Reference to the Kafka consumer, once it is created */
-       private volatile KafkaConsumer<byte[], byte[]> consumer;
-
-       /** Flag to mark the main work loop as alive */
-       private volatile boolean running;
-
-       /** Flag tracking whether the latest commit request has completed */
-       private volatile boolean commitInProgress;
-
-
-       public KafkaConsumerThread(
-                       Logger log,
-                       Handover handover,
-                       Properties kafkaProperties,
-                       KafkaTopicPartitionState<TopicPartition>[] 
subscribedPartitions,
-                       MetricGroup kafkaMetricGroup,
-                       KafkaConsumerCallBridge consumerCallBridge,
-                       String threadName,
-                       long pollTimeout,
-                       boolean useMetrics) {
-
-               super(threadName);
-               setDaemon(true);
-
-               this.log = checkNotNull(log);
-               this.handover = checkNotNull(handover);
-               this.kafkaProperties = checkNotNull(kafkaProperties);
-               this.subscribedPartitions = checkNotNull(subscribedPartitions);
-               this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
-               this.consumerCallBridge = checkNotNull(consumerCallBridge);
-               this.pollTimeout = pollTimeout;
-               this.useMetrics = useMetrics;
-
-               this.nextOffsetsToCommit = new AtomicReference<>();
-               this.running = true;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void run() {
-               // early exit check
-               if (!running) {
-                       return;
-               }
-
-               // this is the means to talk to FlinkKafkaConsumer's main thread
-               final Handover handover = this.handover;
-
-               // This method initializes the KafkaConsumer and guarantees it 
is torn down properly.
-               // This is important, because the consumer has multi-threading 
issues,
-               // including concurrent 'close()' calls.
-               final KafkaConsumer<byte[], byte[]> consumer;
-               try {
-                       consumer = new KafkaConsumer<>(kafkaProperties);
-               }
-               catch (Throwable t) {
-                       handover.reportError(t);
-                       return;
-               }
-
-               // from here on, the consumer is guaranteed to be closed 
properly
-               try {
-                       // The callback invoked by Kafka once an offset commit 
is complete
-                       final OffsetCommitCallback offsetCommitCallback = new 
CommitCallback();
-
-                       // tell the consumer which partitions to work with
-                       consumerCallBridge.assignPartitions(consumer, 
convertKafkaPartitions(subscribedPartitions));
-
-                       // register Kafka's very own metrics in Flink's metric 
reporters
-                       if (useMetrics) {
-                               // register Kafka metrics to Flink
-                               Map<MetricName, ? extends Metric> metrics = 
consumer.metrics();
-                               if (metrics == null) {
-                                       // MapR's Kafka implementation returns 
null here.
-                                       log.info("Consumer implementation does 
not support metrics");
-                               } else {
-                                       // we have Kafka metrics, register them
-                                       for (Map.Entry<MetricName, ? extends 
Metric> metric: metrics.entrySet()) {
-                                               
kafkaMetricGroup.gauge(metric.getKey().name(), new 
KafkaMetricWrapper(metric.getValue()));
-                                       }
-                               }
-                       }
-
-                       // early exit check
-                       if (!running) {
-                               return;
-                       }
-
-                       // seek the consumer to the initial offsets
-                       for (KafkaTopicPartitionState<TopicPartition> partition 
: subscribedPartitions) {
-                               if (partition.isOffsetDefined()) {
-                                       log.info("Partition {} has restored 
initial offsets {} from checkpoint / savepoint; " +
-                                                       "seeking the consumer 
to position {}",
-                                                       
partition.getKafkaPartitionHandle(), partition.getOffset(), 
partition.getOffset() + 1);
-
-                                       
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
-                               }
-                               else {
-                                       // for partitions that do not have 
offsets restored from a checkpoint/savepoint,
-                                       // we need to define our internal 
offset state for them using the initial offsets retrieved from Kafka
-                                       // by the KafkaConsumer, so that they 
are correctly checkpointed and committed on the next checkpoint
-
-                                       long fetchedOffset = 
consumer.position(partition.getKafkaPartitionHandle());
-
-                                       log.info("Partition {} has no initial 
offset; the consumer has position {}, " +
-                                                       "so the initial offset 
will be set to {}",
-                                                       
partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
-
-                                       // the fetched offset represents the 
next record to process, so we need to subtract it by 1
-                                       partition.setOffset(fetchedOffset - 1);
-                               }
-                       }
-
-                       // from now on, external operations may call the 
consumer
-                       this.consumer = consumer;
-
-                       // the latest bulk of records. may carry across the 
loop if the thread is woken up
-                       // from blocking on the handover
-                       ConsumerRecords<byte[], byte[]> records = null;
-
-                       // main fetch loop
-                       while (running) {
-
-                               // check if there is something to commit
-                               if (!commitInProgress) {
-                                       // get and reset the work-to-be 
committed, so we don't repeatedly commit the same
-                                       final Map<TopicPartition, 
OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
-
-                                       if (toCommit != null) {
-                                               log.debug("Sending async offset 
commit request to Kafka broker");
-
-                                               // also record that a commit is 
already in progress
-                                               // the order here matters! 
first set the flag, then send the commit command.
-                                               commitInProgress = true;
-                                               consumer.commitAsync(toCommit, 
offsetCommitCallback);
-                                       }
-                               }
-
-                               // get the next batch of records, unless we did 
not manage to hand the old batch over
-                               if (records == null) {
-                                       try {
-                                               records = 
consumer.poll(pollTimeout);
-                                       }
-                                       catch (WakeupException we) {
-                                               continue;
-                                       }
-                               }
-
-                               try {
-                                       handover.produce(records);
-                                       records = null;
-                               }
-                               catch (Handover.WakeupException e) {
-                                       // fall through the loop
-                               }
-                       }
-                       // end main fetch loop
-               }
-               catch (Throwable t) {
-                       // let the main thread know and exit
-                       // it may be that this exception comes because the main 
thread closed the handover, in
-                       // which case the below reporting is irrelevant, but 
does not hurt either
-                       handover.reportError(t);
-               }
-               finally {
-                       // make sure the handover is closed if it is not 
already closed or has an error
-                       handover.close();
-
-                       // make sure the KafkaConsumer is closed
-                       try {
-                               consumer.close();
-                       }
-                       catch (Throwable t) {
-                               log.warn("Error while closing Kafka consumer", 
t);
-                       }
-               }
-       }
-
-       /**
-        * Shuts this thread down, waking up the thread gracefully if blocked 
(without Thread.interrupt() calls).
-        */
-       public void shutdown() {
-               running = false;
-
-               // We cannot call close() on the KafkaConsumer, because it will 
actually throw
-               // an exception if a concurrent call is in progress
-
-               // this wakes up the consumer if it is blocked handing over 
records
-               handover.wakeupProducer();
-
-               // this wakes up the consumer if it is blocked in a kafka poll 
-               if (consumer != null) {
-                       consumer.wakeup();
-               }
-       }
-
-       /**
-        * Tells this thread to commit a set of offsets. This method does not 
block, the committing
-        * operation will happen asynchronously.
-        * 
-        * <p>Only one commit operation may be pending at any time. If the 
committing takes longer than
-        * the frequency with which this method is called, then some commits 
may be skipped due to being
-        * superseded  by newer ones.
-        * 
-        * @param offsetsToCommit The offsets to commit
-        */
-       public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> 
offsetsToCommit) {
-               // record the work to be committed by the main consumer thread 
and make sure the consumer notices that
-               if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
-                       log.warn("Committing offsets to Kafka takes longer than 
the checkpoint interval. " +
-                                       "Skipping commit of previous offsets 
because newer complete checkpoint offsets are available. " +
-                                       "This does not compromise Flink's 
checkpoint integrity.");
-               }
-
-               // if the consumer is blocked in a poll() or handover 
operation, wake it up to commit soon
-               handover.wakeupProducer();
-               if (consumer != null) {
-                       consumer.wakeup();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       private static List<TopicPartition> 
convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
-               ArrayList<TopicPartition> result = new 
ArrayList<>(partitions.length);
-               for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
-                       result.add(p.getKafkaPartitionHandle());
-               }
-               return result;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private class CommitCallback implements OffsetCommitCallback {
-
-               @Override
-               public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception ex) {
-                       commitInProgress = false;
-
-                       if (ex != null) {
-                               log.warn("Committing offsets to Kafka failed. 
This does not compromise Flink's checkpoints.", ex);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
deleted file mode 100644
index 6bdfb48..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
deleted file mode 100644
index 7a82365..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internal.Handover;
-import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
-import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.TopicPartition;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-/**
- * Unit tests for the {@link Kafka09Fetcher}.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaConsumerThread.class)
-public class Kafka09FetcherTest {
-
-       @Test
-       public void testCommitDoesNotBlock() throws Exception {
-
-               // test data
-               final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
-               final Map<KafkaTopicPartition, Long> testCommitData = new 
HashMap<>();
-               testCommitData.put(testPartition, 11L);
-
-               // to synchronize when the consumer is in its blocking method
-               final OneShotLatch sync = new OneShotLatch();
-
-               // ----- the mock consumer with blocking poll calls ----
-               final MultiShotLatch blockerLatch = new MultiShotLatch();
-               
-               KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-               when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
-                       
-                       @Override
-                       public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) throws InterruptedException {
-                               sync.trigger();
-                               blockerLatch.await();
-                               return ConsumerRecords.empty();
-                       }
-               });
-
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocation) {
-                               blockerLatch.trigger();
-                               return null;
-                       }
-               }).when(mockConsumer).wakeup();
-
-               // make sure the fetcher creates the mock consumer
-               
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-               // ----- create the test fetcher -----
-
-               @SuppressWarnings("unchecked")
-               SourceContext<String> sourceContext = mock(SourceContext.class);
-               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
-               KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-               final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
-                               sourceContext,
-                               topics,
-                               null, /* periodic watermark extractor */
-                               null, /* punctuated watermark extractor */
-                               new TestProcessingTimeService(),
-                               10, /* watermark interval */
-                               this.getClass().getClassLoader(),
-                               true, /* checkpointing */
-                               "task_name",
-                               new UnregisteredMetricsGroup(),
-                               schema,
-                               new Properties(),
-                               0L,
-                               false);
-
-               // ----- run the fetcher -----
-
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
-               final Thread fetcherRunner = new Thread("fetcher runner") {
-
-                       @Override
-                       public void run() {
-                               try {
-                                       fetcher.runFetchLoop();
-                               } catch (Throwable t) {
-                                       error.set(t);
-                               }
-                       }
-               };
-               fetcherRunner.start();
-
-               // wait until the fetcher has reached the method of interest
-               sync.await();
-
-               // ----- trigger the offset commit -----
-               
-               final AtomicReference<Throwable> commitError = new 
AtomicReference<>();
-               final Thread committer = new Thread("committer runner") {
-                       @Override
-                       public void run() {
-                               try {
-                                       
fetcher.commitInternalOffsetsToKafka(testCommitData);
-                               } catch (Throwable t) {
-                                       commitError.set(t);
-                               }
-                       }
-               };
-               committer.start();
-
-               // ----- ensure that the committer finishes in time  -----
-               committer.join(30000);
-               assertFalse("The committer did not finish in time", 
committer.isAlive());
-
-               // ----- test done, wait till the fetcher is done for a clean 
shutdown -----
-               fetcher.cancel();
-               fetcherRunner.join();
-
-               // check that there were no errors in the fetcher
-               final Throwable fetcherError = error.get();
-               if (fetcherError != null && !(fetcherError instanceof 
Handover.ClosedException)) {
-                       throw new Exception("Exception in the fetcher", 
fetcherError);
-               }
-               final Throwable committerError = commitError.get();
-               if (committerError != null) {
-                       throw new Exception("Exception in the committer", 
committerError);
-               }
-       }
-
-       @Test
-       public void ensureOffsetsGetCommitted() throws Exception {
-               
-               // test data
-               final KafkaTopicPartition testPartition1 = new 
KafkaTopicPartition("test", 42);
-               final KafkaTopicPartition testPartition2 = new 
KafkaTopicPartition("another", 99);
-               
-               final Map<KafkaTopicPartition, Long> testCommitData1 = new 
HashMap<>();
-               testCommitData1.put(testPartition1, 11L);
-               testCommitData1.put(testPartition2, 18L);
-
-               final Map<KafkaTopicPartition, Long> testCommitData2 = new 
HashMap<>();
-               testCommitData2.put(testPartition1, 19L);
-               testCommitData2.put(testPartition2, 28L);
-
-               final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> 
commitStore = new LinkedBlockingQueue<>();
-
-
-               // ----- the mock consumer with poll(), wakeup(), and 
commit(A)sync calls ----
-
-               final MultiShotLatch blockerLatch = new MultiShotLatch();
-
-               KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-
-               when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
-                       @Override
-                       public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) throws InterruptedException {
-                               blockerLatch.await();
-                               return ConsumerRecords.empty();
-                       }
-               });
-
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocation) {
-                               blockerLatch.trigger();
-                               return null;
-                       }
-               }).when(mockConsumer).wakeup();
-
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocation) {
-                               @SuppressWarnings("unchecked")
-                               Map<TopicPartition, OffsetAndMetadata> offsets 
= 
-                                               (Map<TopicPartition, 
OffsetAndMetadata>) invocation.getArguments()[0];
-
-                               OffsetCommitCallback callback = 
(OffsetCommitCallback) invocation.getArguments()[1];
-
-                               commitStore.add(offsets);
-                               callback.onComplete(offsets, null);
-
-                               return null; 
-                       }
-               }).when(mockConsumer).commitAsync(
-                               Mockito.<Map<TopicPartition, 
OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
-
-               // make sure the fetcher creates the mock consumer
-               
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-               // ----- create the test fetcher -----
-
-               @SuppressWarnings("unchecked")
-               SourceContext<String> sourceContext = mock(SourceContext.class);
-               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
-               KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-               final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
-                               sourceContext,
-                               topics,
-                               null, /* periodic watermark extractor */
-                               null, /* punctuated watermark extractor */
-                               new TestProcessingTimeService(),
-                               10, /* watermark interval */
-                               this.getClass().getClassLoader(),
-                               true, /* checkpointing */
-                               "task_name",
-                               new UnregisteredMetricsGroup(),
-                               schema,
-                               new Properties(),
-                               0L,
-                               false);
-
-
-               // ----- run the fetcher -----
-
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
-               final Thread fetcherRunner = new Thread("fetcher runner") {
-
-                       @Override
-                       public void run() {
-                               try {
-                                       fetcher.runFetchLoop();
-                               } catch (Throwable t) {
-                                       error.set(t);
-                               }
-                       }
-               };
-               fetcherRunner.start();
-
-               // ----- trigger the first offset commit -----
-
-               fetcher.commitInternalOffsetsToKafka(testCommitData1);
-               Map<TopicPartition, OffsetAndMetadata> result1 = 
commitStore.take();
-
-               for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result1.entrySet()) {
-                       TopicPartition partition = entry.getKey();
-                       if (partition.topic().equals("test")) {
-                               assertEquals(42, partition.partition());
-                               assertEquals(12L, entry.getValue().offset());
-                       }
-                       else if (partition.topic().equals("another")) {
-                               assertEquals(99, partition.partition());
-                               assertEquals(17L, entry.getValue().offset());
-                       }
-               }
-
-               // ----- trigger the second offset commit -----
-
-               fetcher.commitInternalOffsetsToKafka(testCommitData2);
-               Map<TopicPartition, OffsetAndMetadata> result2 = 
commitStore.take();
-
-               for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result2.entrySet()) {
-                       TopicPartition partition = entry.getKey();
-                       if (partition.topic().equals("test")) {
-                               assertEquals(42, partition.partition());
-                               assertEquals(20L, entry.getValue().offset());
-                       }
-                       else if (partition.topic().equals("another")) {
-                               assertEquals(99, partition.partition());
-                               assertEquals(27L, entry.getValue().offset());
-                       }
-               }
-               
-               // ----- test done, wait till the fetcher is done for a clean 
shutdown -----
-               fetcher.cancel();
-               fetcherRunner.join();
-
-               // check that there were no errors in the fetcher
-               final Throwable caughtError = error.get();
-               if (caughtError != null && !(caughtError instanceof 
Handover.ClosedException)) {
-                       throw new Exception("Exception in the fetcher", 
caughtError);
-               }
-       }
-
-       @Test
-       public void testCancellationWhenEmitBlocks() throws Exception {
-
-               // ----- some test data -----
-
-               final String topic = "test-topic";
-               final int partition = 3;
-               final byte[] payload = new byte[] {1, 2, 3, 4};
-
-               final List<ConsumerRecord<byte[], byte[]>> records = 
Arrays.asList(
-                               new ConsumerRecord<byte[], byte[]>(topic, 
partition, 15, payload, payload),
-                               new ConsumerRecord<byte[], byte[]>(topic, 
partition, 16, payload, payload),
-                               new ConsumerRecord<byte[], byte[]>(topic, 
partition, 17, payload, payload));
-
-               final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
data = new HashMap<>();
-               data.put(new TopicPartition(topic, partition), records);
-
-               final ConsumerRecords<byte[], byte[]> consumerRecords = new 
ConsumerRecords<>(data);
-
-               // ----- the test consumer -----
-
-               final KafkaConsumer<?, ?> mockConsumer = 
mock(KafkaConsumer.class);
-               when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
-                       @Override
-                       public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) {
-                               return consumerRecords;
-                       }
-               });
-
-               
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-               // ----- build a fetcher -----
-
-               BlockingSourceContext<String> sourceContext = new 
BlockingSourceContext<>();
-               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition(topic, partition));
-               KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-               final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
-                               sourceContext,
-                               topics,
-                               null, /* periodic watermark extractor */
-                               null, /* punctuated watermark extractor */
-                               new TestProcessingTimeService(),
-                               10, /* watermark interval */
-                               this.getClass().getClassLoader(),
-                               true, /* checkpointing */
-                               "task_name",
-                               new UnregisteredMetricsGroup(),
-                               schema,
-                               new Properties(),
-                               0L,
-                               false);
-
-
-               // ----- run the fetcher -----
-
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
-               final Thread fetcherRunner = new Thread("fetcher runner") {
-
-                       @Override
-                       public void run() {
-                               try {
-                                       fetcher.runFetchLoop();
-                               } catch (Throwable t) {
-                                       error.set(t);
-                               }
-                       }
-               };
-               fetcherRunner.start();
-
-               // wait until the thread started to emit records to the source 
context
-               sourceContext.waitTillHasBlocker();
-
-               // now we try to cancel the fetcher, including the interruption 
usually done on the task thread
-               // once it has finished, there must be no more thread blocked 
on the source context
-               fetcher.cancel();
-               fetcherRunner.interrupt();
-               fetcherRunner.join();
-
-               assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
-       }
-
-       // 
------------------------------------------------------------------------
-       //  test utilities
-       // 
------------------------------------------------------------------------
-
-       private static final class BlockingSourceContext<T> implements 
SourceContext<T> {
-
-               private final ReentrantLock lock = new ReentrantLock();
-               private final OneShotLatch inBlocking = new OneShotLatch();
-
-               @Override
-               public void collect(T element) {
-                       block();
-               }
-
-               @Override
-               public void collectWithTimestamp(T element, long timestamp) {
-                       block();
-               }
-
-               @Override
-               public void emitWatermark(Watermark mark) {
-                       block();
-               }
-
-               @Override
-               public Object getCheckpointLock() {
-                       return new Object();
-               }
-
-               @Override
-               public void close() {}
-
-               public void waitTillHasBlocker() throws InterruptedException {
-                       inBlocking.await();
-               }
-
-               public boolean isStillBlocking() {
-                       return lock.isLocked();
-               }
-
-               @SuppressWarnings({"InfiniteLoopStatement", 
"SynchronizationOnLocalVariableOrMethodParameter"})
-               private void block() {
-                       lock.lock();
-                       try {
-                               inBlocking.trigger();
-
-                               // put this thread to sleep indefinitely
-                               final Object o = new Object();
-                               while (true) {
-                                       synchronized (o) {
-                                               o.wait();
-                                       }
-                               }
-                       }
-                       catch (InterruptedException e) {
-                               // exit cleanly, simply reset the interruption 
flag
-                               Thread.currentThread().interrupt();
-                       }
-                       finally {
-                               lock.unlock();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
deleted file mode 100644
index d18e2a9..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-public class Kafka09ITCase extends KafkaConsumerTestBase {
-
-       // 
------------------------------------------------------------------------
-       //  Suite of Tests
-       // 
------------------------------------------------------------------------
-
-       @Test(timeout = 60000)
-       public void testFailOnNoBroker() throws Exception {
-               runFailOnNoBrokerTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testConcurrentProducerConsumerTopology() throws Exception {
-               runSimpleConcurrentProducerConsumerTopology();
-       }
-
-
-       @Test(timeout = 60000)
-       public void testKeyValueSupport() throws Exception {
-               runKeyValueTest();
-       }
-
-       // --- canceling / failures ---
-
-       @Test(timeout = 60000)
-       public void testCancelingEmptyTopic() throws Exception {
-               runCancelingOnEmptyInputTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testCancelingFullTopic() throws Exception {
-               runCancelingOnFullInputTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testFailOnDeploy() throws Exception {
-               runFailOnDeployTest();
-       }
-
-
-       // --- source to partition mappings and exactly once ---
-
-       @Test(timeout = 60000)
-       public void testOneToOneSources() throws Exception {
-               runOneToOneExactlyOnceTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testOneSourceMultiplePartitions() throws Exception {
-               runOneSourceMultiplePartitionsExactlyOnceTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testMultipleSourcesOnePartition() throws Exception {
-               runMultipleSourcesOnePartitionExactlyOnceTest();
-       }
-
-       // --- broker failure ---
-
-       @Test(timeout = 60000)
-       public void testBrokerFailure() throws Exception {
-               runBrokerFailureTest();
-       }
-
-       // --- special executions ---
-
-       @Test(timeout = 60000)
-       public void testBigRecordJob() throws Exception {
-               runBigRecordTestTopology();
-       }
-
-       @Test(timeout = 60000)
-       public void testMultipleTopics() throws Exception {
-               runProduceConsumeMultipleTopics();
-       }
-
-       @Test(timeout = 60000)
-       public void testAllDeletes() throws Exception {
-               runAllDeletesTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testEndOfStream() throws Exception {
-               runEndOfStreamTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testMetrics() throws Throwable {
-               runMetricsTest();
-       }
-
-       // --- offset committing ---
-
-       @Test(timeout = 60000)
-       public void testCommitOffsetsToKafka() throws Exception {
-               runCommitOffsetsToKafka();
-       }
-
-       @Test(timeout = 60000)
-       public void testStartFromKafkaCommitOffsets() throws Exception {
-               runStartFromKafkaCommitOffsets();
-       }
-
-       @Test(timeout = 60000)
-       public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
-               runAutoOffsetRetrievalAndCommitToKafka();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
deleted file mode 100644
index 45f70ac..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
-
-       @Override
-       protected KafkaTableSink createTableSink(String topic, Properties 
properties, KafkaPartitioner<Row> partitioner,
-                       final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
-               return new Kafka09JsonTableSink(topic, properties, partitioner) 
{
-                       @Override
-                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(String topic, Properties properties,
-                                       SerializationSchema<Row> 
serializationSchema, KafkaPartitioner<Row> partitioner) {
-                               return kafkaProducer;
-                       }
-               };
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       protected SerializationSchema<Row> getSerializationSchema() {
-               return new JsonRowSerializationSchema(FIELD_NAMES);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
deleted file mode 100644
index 4a75f50..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import java.util.Properties;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {
-
-       @Override
-       protected KafkaTableSource createTableSource(String topic, Properties 
properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
-               return new Kafka09JsonTableSource(topic, properties, 
fieldNames, typeInfo);
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
-               return (Class) JsonRowDeserializationSchema.class;
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
-               return (Class) FlinkKafkaConsumer09.class;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
deleted file mode 100644
index ae4f5b2..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class Kafka09ProducerITCase extends KafkaProducerTestBase {
-
-       @Test
-       public void testCustomPartitioning() {
-               runCustomPartitioningTest();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
deleted file mode 100644
index e748537..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.test.util.SecureTestEnvironment;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/*
- * Kafka Secure Connection (kerberos) IT test case
- */
-public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
-
-       protected static final Logger LOG = 
LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
-
-       @BeforeClass
-       public static void prepare() throws IOException, ClassNotFoundException 
{
-               
LOG.info("-------------------------------------------------------------------------");
-               LOG.info("    Starting Kafka09SecuredRunITCase ");
-               
LOG.info("-------------------------------------------------------------------------");
-
-               SecureTestEnvironment.prepare(tempFolder);
-               
SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
-
-               startClusters(true);
-       }
-
-       @AfterClass
-       public static void shutDownServices() {
-               shutdownClusters();
-               SecureTestEnvironment.cleanup();
-       }
-
-
-       //timeout interval is large since in Travis, ZK connection timeout 
occurs frequently
-       //The timeout for the test case is 2 times timeout of ZK connection
-       @Test(timeout = 600000)
-       public void testMultipleTopics() throws Exception {
-               runProduceConsumeMultipleTopics();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
deleted file mode 100644
index 18b2aec..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-import java.util.concurrent.Future;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FlinkKafkaProducerBase.class)
-public class KafkaProducerTest extends TestLogger {
-       
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testPropagateExceptions() {
-               try {
-                       // mock kafka producer
-                       KafkaProducer<?, ?> kafkaProducerMock = 
mock(KafkaProducer.class);
-                       
-                       // partition setup
-                       
when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-                               // returning a unmodifiable list to mimic 
KafkaProducer#partitionsFor() behaviour
-                               Collections.singletonList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
-
-                       // failure when trying to send an element
-                       when(kafkaProducerMock.send(any(ProducerRecord.class), 
any(Callback.class)))
-                               .thenAnswer(new 
Answer<Future<RecordMetadata>>() {
-                                       @Override
-                                       public Future<RecordMetadata> 
answer(InvocationOnMock invocation) throws Throwable {
-                                               Callback callback = (Callback) 
invocation.getArguments()[1];
-                                               callback.onCompletion(null, new 
Exception("Test error"));
-                                               return null;
-                                       }
-                               });
-                       
-                       // make sure the FlinkKafkaProducer instantiates our 
mock producer
-                       
whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
-                       
-                       // (1) producer that propagates errors
-
-                       FlinkKafkaProducer09<String> producerPropagating = new 
FlinkKafkaProducer09<>(
-                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
-
-                       OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
-                                       new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
-
-                       testHarness.open();
-
-                       try {
-                               testHarness.processElement(new 
StreamRecord<>("value"));
-                               testHarness.processElement(new 
StreamRecord<>("value"));
-                               fail("This should fail with an exception");
-                       }
-                       catch (Exception e) {
-                               assertNotNull(e.getCause());
-                               assertNotNull(e.getCause().getMessage());
-                               
assertTrue(e.getCause().getMessage().contains("Test error"));
-                       }
-
-                       // (2) producer that only logs errors
-
-                       FlinkKafkaProducer09<String> producerLogging = new 
FlinkKafkaProducer09<>(
-                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
-                       producerLogging.setLogFailuresOnly(true);
-
-                       testHarness = new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
-
-                       testHarness.open();
-
-                       testHarness.processElement(new StreamRecord<>("value"));
-                       testHarness.processElement(new StreamRecord<>("value"));
-
-                       testHarness.close();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
deleted file mode 100644
index 1802e0c..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.common.KafkaException;
-import kafka.api.PartitionMetadata;
-import kafka.network.SocketServer;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.Seq;
-
-import java.io.File;
-import java.net.BindException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * An implementation of the KafkaServerProvider for Kafka 0.9
- */
-public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
-
-       protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
-       private File tmpZkDir;
-       private File tmpKafkaParent;
-       private List<File> tmpKafkaDirs;
-       private List<KafkaServer> brokers;
-       private TestingServer zookeeper;
-       private String zookeeperConnectionString;
-       private String brokerConnectionString = "";
-       private Properties standardProps;
-       private Properties additionalServerProperties;
-       private boolean secureMode = false;
-       // 6 seconds is default. Seems to be too small for travis. 30 seconds
-       private String zkTimeout = "30000";
-
-       public String getBrokerConnectionString() {
-               return brokerConnectionString;
-       }
-
-       @Override
-       public Properties getStandardProperties() {
-               return standardProps;
-       }
-
-       @Override
-       public String getVersion() {
-               return "0.9";
-       }
-
-       @Override
-       public List<KafkaServer> getBrokers() {
-               return brokers;
-       }
-
-       @Override
-       public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, 
KeyedDeserializationSchema<T> readSchema, Properties props) {
-               return new FlinkKafkaConsumer09<>(topics, readSchema, props);
-       }
-
-       @Override
-       public <T> StreamSink<T> getProducerSink(
-                       String topic,
-                       KeyedSerializationSchema<T> serSchema,
-                       Properties props,
-                       KafkaPartitioner<T> partitioner) {
-               FlinkKafkaProducer09<T> prod = new 
FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
-               prod.setFlushOnCheckpoint(true);
-               return new StreamSink<>(prod);
-       }
-
-       @Override
-       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
KafkaPartitioner<T> partitioner) {
-               FlinkKafkaProducer09<T> prod = new 
FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
-               prod.setFlushOnCheckpoint(true);
-               return stream.addSink(prod);
-       }
-
-       @Override
-       public KafkaOffsetHandler createOffsetHandler(Properties props) {
-               return new KafkaOffsetHandlerImpl(props);
-       }
-
-       @Override
-       public void restartBroker(int leaderId) throws Exception {
-               brokers.set(leaderId, getKafkaServer(leaderId, 
tmpKafkaDirs.get(leaderId)));
-       }
-
-       @Override
-       public int getLeaderToShutDown(String topic) throws Exception {
-               ZkUtils zkUtils = getZkUtils();
-               try {
-                       PartitionMetadata firstPart = null;
-                       do {
-                               if (firstPart != null) {
-                                       LOG.info("Unable to find leader. error 
code {}", firstPart.errorCode());
-                                       // not the first try. Sleep a bit
-                                       Thread.sleep(150);
-                               }
-
-                               Seq<PartitionMetadata> partitionMetadata = 
AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata();
-                               firstPart = partitionMetadata.head();
-                       }
-                       while (firstPart.errorCode() != 0);
-
-                       return firstPart.leader().get().id();
-               } finally {
-                       zkUtils.close();
-               }
-       }
-
-       @Override
-       public int getBrokerId(KafkaServer server) {
-               return server.config().brokerId();
-       }
-
-       @Override
-       public boolean isSecureRunSupported() {
-               return true;
-       }
-
-       @Override
-       public void prepare(int numKafkaServers, Properties 
additionalServerProperties, boolean secureMode) {
-
-               //increase the timeout since in Travis ZK connection takes long 
time for secure connection.
-               if(secureMode) {
-                       //run only one kafka server to avoid multiple ZK 
connections from many instances - Travis timeout
-                       numKafkaServers = 1;
-                       zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) 
* 15);
-               }
-
-               this.additionalServerProperties = additionalServerProperties;
-               this.secureMode = secureMode;
-               File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
-               tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
-               assertTrue("cannot create zookeeper temp dir", 
tmpZkDir.mkdirs());
-
-               tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + 
(UUID.randomUUID().toString()));
-               assertTrue("cannot create kafka temp dir", 
tmpKafkaParent.mkdirs());
-
-               tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-               for (int i = 0; i < numKafkaServers; i++) {
-                       File tmpDir = new File(tmpKafkaParent, "server-" + i);
-                       assertTrue("cannot create kafka temp dir", 
tmpDir.mkdir());
-                       tmpKafkaDirs.add(tmpDir);
-               }
-
-               zookeeper = null;
-               brokers = null;
-
-               try {
-                       LOG.info("Starting Zookeeper");
-                       zookeeper = new TestingServer(-1, tmpZkDir);
-                       zookeeperConnectionString = 
zookeeper.getConnectString();
-                       LOG.info("zookeeperConnectionString: {}", 
zookeeperConnectionString);
-
-                       LOG.info("Starting KafkaServer");
-                       brokers = new ArrayList<>(numKafkaServers);
-
-                       for (int i = 0; i < numKafkaServers; i++) {
-                               brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i)));
-
-                               SocketServer socketServer = 
brokers.get(i).socketServer();
-                               if(secureMode) {
-                                       brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
-                               } else {
-                                       brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
-                               }
-                       }
-
-                       LOG.info("ZK and KafkaServer started.");
-               }
-               catch (Throwable t) {
-                       t.printStackTrace();
-                       fail("Test setup failed: " + t.getMessage());
-               }
-
-               LOG.info("brokerConnectionString --> {}", 
brokerConnectionString);
-
-               standardProps = new Properties();
-               standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
-               standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
-               standardProps.setProperty("group.id", "flink-tests");
-               standardProps.setProperty("enable.auto.commit", "false");
-               standardProps.setProperty("zookeeper.session.timeout.ms", 
zkTimeout);
-               standardProps.setProperty("zookeeper.connection.timeout.ms", 
zkTimeout);
-               standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning. (earliest is kafka 0.9 value)
-               standardProps.setProperty("max.partition.fetch.bytes", "256"); 
// make a lot of fetches (MESSAGES MUST BE SMALLER!)
-
-       }
-
-       @Override
-       public void shutdown() {
-               for (KafkaServer broker : brokers) {
-                       if (broker != null) {
-                               broker.shutdown();
-                       }
-               }
-               brokers.clear();
-
-               if (zookeeper != null) {
-                       try {
-                               zookeeper.stop();
-                               zookeeper.close();
-                       }
-                       catch (Exception e) {
-                               LOG.warn("ZK.stop() failed", e);
-                       }
-                       zookeeper = null;
-               }
-
-               // clean up the temp spaces
-
-               if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
-                       try {
-                               FileUtils.deleteDirectory(tmpKafkaParent);
-                       }
-                       catch (Exception e) {
-                               // ignore
-                       }
-               }
-               if (tmpZkDir != null && tmpZkDir.exists()) {
-                       try {
-                               FileUtils.deleteDirectory(tmpZkDir);
-                       }
-                       catch (Exception e) {
-                               // ignore
-                       }
-               }
-       }
-
-       public ZkUtils getZkUtils() {
-               LOG.info("In getZKUtils:: zookeeperConnectionString = {}", 
zookeeperConnectionString);
-               ZkClient creator = new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
-                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
-               return ZkUtils.apply(creator, false);
-       }
-
-       @Override
-       public void createTestTopic(String topic, int numberOfPartitions, int 
replicationFactor, Properties topicConfig) {
-               // create topic with one client
-               LOG.info("Creating topic {}", topic);
-
-               ZkUtils zkUtils = getZkUtils();
-               try {
-                       AdminUtils.createTopic(zkUtils, topic, 
numberOfPartitions, replicationFactor, topicConfig);
-               } finally {
-                       zkUtils.close();
-               }
-
-               LOG.info("Topic {} create request is successfully posted", 
topic);
-
-               // validate that the topic has been created
-               final long deadline = System.currentTimeMillis() + 
Integer.parseInt(zkTimeout);
-               do {
-                       try {
-                               if(secureMode) {
-                                       //increase wait time since in Travis ZK 
timeout occurs frequently
-                                       int wait = Integer.parseInt(zkTimeout) 
/ 100;
-                                       LOG.info("waiting for {} msecs before 
the topic {} can be checked", wait, topic);
-                                       Thread.sleep(wait);
-                               } else {
-                                       Thread.sleep(100);
-                               }
-
-                       } catch (InterruptedException e) {
-                               // restore interrupted state
-                       }
-                       // we could use AdminUtils.topicExists(zkUtils, topic) 
here, but it's results are
-                       // not always correct.
-
-                       LOG.info("Validating if the topic {} has been created 
or not", topic);
-
-                       // create a new ZK utils connection
-                       ZkUtils checkZKConn = getZkUtils();
-                       if(AdminUtils.topicExists(checkZKConn, topic)) {
-                               LOG.info("topic {} has been created 
successfully", topic);
-                               checkZKConn.close();
-                               return;
-                       }
-                       LOG.info("topic {} has not been created yet. Will check 
again...", topic);
-                       checkZKConn.close();
-               }
-               while (System.currentTimeMillis() < deadline);
-               fail("Test topic could not be created");
-       }
-
-       @Override
-       public void deleteTestTopic(String topic) {
-               ZkUtils zkUtils = getZkUtils();
-               try {
-                       LOG.info("Deleting topic {}", topic);
-
-                       ZkClient zk = new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
-                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
-
-                       AdminUtils.deleteTopic(zkUtils, topic);
-
-                       zk.close();
-               } finally {
-                       zkUtils.close();
-               }
-       }
-
-       /**
-        * Copied from 
com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
-        */
-       protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) 
throws Exception {
-               Properties kafkaProperties = new Properties();
-
-               // properties have to be Strings
-               kafkaProperties.put("advertised.host.name", KAFKA_HOST);
-               kafkaProperties.put("broker.id", Integer.toString(brokerId));
-               kafkaProperties.put("log.dir", tmpFolder.toString());
-               kafkaProperties.put("zookeeper.connect", 
zookeeperConnectionString);
-               kafkaProperties.put("message.max.bytes", String.valueOf(50 * 
1024 * 1024));
-               kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
-
-               // for CI stability, increase zookeeper session timeout
-               kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
-               kafkaProperties.put("zookeeper.connection.timeout.ms", 
zkTimeout);
-               if(additionalServerProperties != null) {
-                       kafkaProperties.putAll(additionalServerProperties);
-               }
-
-               final int numTries = 5;
-
-               for (int i = 1; i <= numTries; i++) {
-                       int kafkaPort = NetUtils.getAvailablePort();
-                       kafkaProperties.put("port", 
Integer.toString(kafkaPort));
-
-                       //to support secure kafka cluster
-                       if(secureMode) {
-                               LOG.info("Adding Kafka secure configurations");
-                               kafkaProperties.put("listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
-                               kafkaProperties.put("advertised.listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
-                               kafkaProperties.putAll(getSecureProperties());
-                       }
-
-                       KafkaConfig kafkaConfig = new 
KafkaConfig(kafkaProperties);
-
-                       try {
-                               scala.Option<String> stringNone = 
scala.Option.apply(null);
-                               KafkaServer server = new 
KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
-                               server.startup();
-                               return server;
-                       }
-                       catch (KafkaException e) {
-                               if (e.getCause() instanceof BindException) {
-                                       // port conflict, retry...
-                                       LOG.info("Port conflict when starting 
Kafka Broker. Retrying...");
-                               }
-                               else {
-                                       throw e;
-                               }
-                       }
-               }
-
-               throw new Exception("Could not start Kafka after " + numTries + 
" retries due to port conflicts.");
-       }
-
-       public Properties getSecureProperties() {
-               Properties prop = new Properties();
-               if(secureMode) {
-                       prop.put("security.inter.broker.protocol", 
"SASL_PLAINTEXT");
-                       prop.put("security.protocol", "SASL_PLAINTEXT");
-                       prop.put("sasl.kerberos.service.name", "kafka");
-
-                       //add special timeout for Travis
-                       prop.setProperty("zookeeper.session.timeout.ms", 
zkTimeout);
-                       prop.setProperty("zookeeper.connection.timeout.ms", 
zkTimeout);
-                       prop.setProperty("metadata.fetch.timeout.ms","120000");
-               }
-               return prop;
-       }
-
-       private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
-
-               private final KafkaConsumer<byte[], byte[]> offsetClient;
-
-               public KafkaOffsetHandlerImpl(Properties props) {
-                       offsetClient = new KafkaConsumer<>(props);
-               }
-
-               @Override
-               public Long getCommittedOffset(String topicName, int partition) 
{
-                       OffsetAndMetadata committed = 
offsetClient.committed(new TopicPartition(topicName, partition));
-                       return (committed != null) ? committed.offset() : null;
-               }
-
-               @Override
-               public void close() {
-                       offsetClient.close();
-               }
-       }
-
-}

Reply via email to