http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
deleted file mode 100644
index d75a15c..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ /dev/null
@@ -1,1475 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.server.KafkaServer;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
-import 
org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
-import 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
-import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
-import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import 
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import org.apache.flink.testutils.junit.RetryOnException;
-import org.apache.flink.testutils.junit.RetryRule;
-import org.apache.flink.util.Collector;
-
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.StringUtils;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.junit.Assert;
-
-import org.junit.Rule;
-import scala.collection.Seq;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-
-@SuppressWarnings("serial")
-public abstract class KafkaConsumerTestBase extends KafkaTestBase {
-       
-       @Rule
-       public RetryRule retryRule = new RetryRule();
-       
-       // 
------------------------------------------------------------------------
-       //  Required methods by the abstract test base
-       // 
------------------------------------------------------------------------
-
-       protected abstract <T> FlinkKafkaConsumer<T> getConsumer(
-                       List<String> topics, KeyedDeserializationSchema<T> 
deserializationSchema, Properties props);
-
-       protected <T> FlinkKafkaConsumer<T> getConsumer(
-                       List<String> topics, DeserializationSchema<T> 
deserializationSchema, Properties props) {
-               return getConsumer(topics, new 
KeyedDeserializationSchemaWrapper<T>(deserializationSchema), props);
-       }
-
-       protected <T> FlinkKafkaConsumer<T> getConsumer(
-                       String topic, DeserializationSchema<T> 
deserializationSchema, Properties props) {
-               return getConsumer(Collections.singletonList(topic), new 
KeyedDeserializationSchemaWrapper<T>(deserializationSchema), props);
-       }
-
-       protected <T> FlinkKafkaConsumer<T> getConsumer(
-                       String topic, KeyedDeserializationSchema<T> 
deserializationSchema, Properties props) {
-               return getConsumer(Collections.singletonList(topic), 
deserializationSchema, props);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Suite of Tests
-       //
-       //  The tests here are all not activated (by an @Test tag), but need
-       //  to be invoked from the extending classes. That way, the classes can
-       //  select which tests to run.
-       // 
------------------------------------------------------------------------
-
-
-       /**
-        * Test that ensures the KafkaConsumer is properly failing if the topic 
doesnt exist
-        * and a wrong broker was specified
-        *
-        * @throws Exception
-        */
-       public void runFailOnNoBrokerTest() throws Exception {
-               try {
-                       Properties properties = new Properties();
-
-                       StreamExecutionEnvironment see = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-                       see.getConfig().disableSysoutLogging();
-                       see.setNumberOfExecutionRetries(0);
-                       see.setParallelism(1);
-
-                       // use wrong ports for the consumers
-                       properties.setProperty("bootstrap.servers", 
"localhost:80");
-                       properties.setProperty("zookeeper.connect", 
"localhost:80");
-                       properties.setProperty("group.id", "test");
-                       FlinkKafkaConsumer<String> source = 
getConsumer("doesntexist", new SimpleStringSchema(), properties);
-                       DataStream<String> stream = see.addSource(source);
-                       stream.print();
-                       see.execute("No broker test");
-               } catch(RuntimeException re){
-                       Assert.assertTrue("Wrong RuntimeException thrown: " + 
StringUtils.stringifyException(re),
-                                       re.getMessage().contains("Unable to 
retrieve any partitions for the requested topics [doesntexist]"));
-               }
-       }
-       /**
-        * Test that validates that checkpointing and checkpoint notification 
works properly
-        */
-       public void runCheckpointingTest() throws Exception {
-               createTestTopic("testCheckpointing", 1, 1);
-
-               FlinkKafkaConsumer<String> source = 
getConsumer("testCheckpointing", new SimpleStringSchema(), standardProps);
-               Field pendingCheckpointsField = 
FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
-               pendingCheckpointsField.setAccessible(true);
-               LinkedMap pendingCheckpoints = (LinkedMap) 
pendingCheckpointsField.get(source);
-
-               Assert.assertEquals(0, pendingCheckpoints.size());
-               source.setRuntimeContext(new MockRuntimeContext(1, 0));
-
-               final HashMap<KafkaTopicPartition, Long> initialOffsets = new 
HashMap<>();
-               initialOffsets.put(new KafkaTopicPartition("testCheckpointing", 
0), 1337L);
-
-               // first restore
-               source.restoreState(initialOffsets);
-
-               // then open
-               source.open(new Configuration());
-               HashMap<KafkaTopicPartition, Long> state1 = 
source.snapshotState(1, 15);
-
-               assertEquals(initialOffsets, state1);
-
-               HashMap<KafkaTopicPartition, Long> state2 = 
source.snapshotState(2, 30);
-               Assert.assertEquals(initialOffsets, state2);
-
-               Assert.assertEquals(2, pendingCheckpoints.size());
-
-               source.notifyCheckpointComplete(1);
-               Assert.assertEquals(1, pendingCheckpoints.size());
-
-               source.notifyCheckpointComplete(2);
-               Assert.assertEquals(0, pendingCheckpoints.size());
-
-               source.notifyCheckpointComplete(666); // invalid checkpoint
-               Assert.assertEquals(0, pendingCheckpoints.size());
-
-               // create 500 snapshots
-               for (int i = 100; i < 600; i++) {
-                       source.snapshotState(i, 15 * i);
-               }
-               
Assert.assertEquals(FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS, 
pendingCheckpoints.size());
-
-               // commit only the second last
-               source.notifyCheckpointComplete(598);
-               Assert.assertEquals(1, pendingCheckpoints.size());
-
-               // access invalid checkpoint
-               source.notifyCheckpointComplete(590);
-
-               // and the last
-               source.notifyCheckpointComplete(599);
-               Assert.assertEquals(0, pendingCheckpoints.size());
-
-               source.close();
-
-               deleteTestTopic("testCheckpointing");
-       }
-
-       /**
-        * Tests that offsets are properly committed to ZooKeeper and initial 
offsets are read from ZooKeeper.
-        *
-        * This test is only applicable if the Flink Kafka Consumer uses the 
ZooKeeperOffsetHandler.
-        */
-       public void runOffsetInZookeeperValidationTest() throws Exception {
-               final String topicName = "testOffsetInZK";
-               final int parallelism = 3;
-
-               createTestTopic(topicName, parallelism, 1);
-
-               StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env1.getConfig().disableSysoutLogging();
-               env1.enableCheckpointing(50);
-               env1.setNumberOfExecutionRetries(0);
-               env1.setParallelism(parallelism);
-
-               StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env2.getConfig().disableSysoutLogging();
-               env2.enableCheckpointing(50);
-               env2.setNumberOfExecutionRetries(0);
-               env2.setParallelism(parallelism);
-
-               StreamExecutionEnvironment env3 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env3.getConfig().disableSysoutLogging();
-               env3.enableCheckpointing(50);
-               env3.setNumberOfExecutionRetries(0);
-               env3.setParallelism(parallelism);
-
-               // write a sequence from 0 to 99 to each of the 3 partitions.
-               writeSequence(env1, topicName, 100, parallelism);
-
-               readSequence(env2, standardProps, parallelism, topicName, 100, 
0);
-
-               CuratorFramework zkClient = createZookeeperClient();
-
-               long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), 
topicName, 0);
-               long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), 
topicName, 1);
-               long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), 
topicName, 2);
-
-               LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
-
-               assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 
&& o1 <= 100));
-               assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o2 >= 0 
&& o2 <= 100));
-               assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o3 >= 0 
&& o3 <= 100));
-
-               LOG.info("Manipulating offsets");
-
-               // set the offset to 50 for the three partitions
-               ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, 
standardCC.groupId(), topicName, 0, 49);
-               ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, 
standardCC.groupId(), topicName, 1, 49);
-               ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, 
standardCC.groupId(), topicName, 2, 49);
-
-               zkClient.close();
-
-               // create new env
-               readSequence(env3, standardProps, parallelism, topicName, 50, 
50);
-
-               deleteTestTopic(topicName);
-       }
-
-       public void runOffsetAutocommitTest() throws Exception {
-               final String topicName = "testOffsetAutocommit";
-               final int parallelism = 3;
-
-               createTestTopic(topicName, parallelism, 1);
-
-               StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env1.getConfig().disableSysoutLogging();
-               env1.setNumberOfExecutionRetries(0);
-               env1.setParallelism(parallelism);
-
-               StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               // NOTE: We are not enabling the checkpointing!
-               env2.getConfig().disableSysoutLogging();
-               env2.setNumberOfExecutionRetries(0);
-               env2.setParallelism(parallelism);
-
-
-               // write a sequence from 0 to 99 to each of the 3 partitions.
-               writeSequence(env1, topicName, 100, parallelism);
-
-
-               // the readSequence operation sleeps for 20 ms between each 
record.
-               // setting a delay of 25*20 = 500 for the commit interval makes
-               // sure that we commit roughly 3-4 times while reading, however
-               // at least once.
-               Properties readProps = new Properties();
-               readProps.putAll(standardProps);
-               readProps.setProperty("auto.commit.interval.ms", "500");
-
-               // read so that the offset can be committed to ZK
-               readSequence(env2, readProps, parallelism, topicName, 100, 0);
-
-               // get the offset
-               CuratorFramework zkClient = createZookeeperClient();
-
-               long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), 
topicName, 0);
-               long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), 
topicName, 1);
-               long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), 
topicName, 2);
-
-               LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
-
-               // ensure that the offset has been committed
-               assertTrue("Offset of o1=" + o1 + " was not in range", o1 > 0 
&& o1 <= 100);
-               assertTrue("Offset of o2=" + o2 + " was not in range", o2 > 0 
&& o2 <= 100);
-               assertTrue("Offset of o3=" + o3 + " was not in range", o3 > 0 
&& o3 <= 100);
-
-               deleteTestTopic(topicName);
-       }
-
-       /**
-        * Ensure Kafka is working on both producer and consumer side.
-        * This executes a job that contains two Flink pipelines.
-        *
-        * <pre>
-        * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> 
(validating sink)
-        * </pre>
-        * 
-        * We need to externally retry this test. We cannot let Flink's retry 
mechanism do it, because the Kafka producer
-        * does not guarantee exactly-once output. Hence a recovery would 
introduce duplicates that
-        * cause the test to fail.
-        *
-        * This test also ensures that FLINK-3156 doesn't happen again:
-        *
-        * The following situation caused a NPE in the FlinkKafkaConsumer
-        *
-        * topic-1 <-- elements are only produced into topic1.
-        * topic-2
-        *
-        * Therefore, this test is consuming as well from an empty topic.
-        *
-        */
-       @RetryOnException(times=2, 
exception=kafka.common.NotLeaderForPartitionException.class)
-       public void runSimpleConcurrentProducerConsumerTopology() throws 
Exception {
-               final String topic = "concurrentProducerConsumerTopic_" + 
UUID.randomUUID().toString();
-               final String additionalEmptyTopic = "additionalEmptyTopic_" + 
UUID.randomUUID().toString();
-
-               final int parallelism = 3;
-               final int elementsPerPartition = 100;
-               final int totalElements = parallelism * elementsPerPartition;
-
-               createTestTopic(topic, parallelism, 2);
-               createTestTopic(additionalEmptyTopic, parallelism, 1); // 
create an empty topic which will remain empty all the time
-
-               final StreamExecutionEnvironment env =
-                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.setParallelism(parallelism);
-               env.enableCheckpointing(500);
-               env.setNumberOfExecutionRetries(0); // fail immediately
-               env.getConfig().disableSysoutLogging();
-
-               TypeInformation<Tuple2<Long, String>> longStringType = 
TypeInfoParser.parse("Tuple2<Long, String>");
-
-               TypeInformationSerializationSchema<Tuple2<Long, String>> 
sourceSchema =
-                               new 
TypeInformationSerializationSchema<>(longStringType, env.getConfig());
-
-               TypeInformationSerializationSchema<Tuple2<Long, String>> 
sinkSchema =
-                               new 
TypeInformationSerializationSchema<>(longStringType, env.getConfig());
-
-               // ----------- add producer dataflow ----------
-
-               DataStream<Tuple2<Long, String>> stream = env.addSource(new 
RichParallelSourceFunction<Tuple2<Long,String>>() {
-
-                       private boolean running = true;
-
-                       @Override
-                       public void run(SourceContext<Tuple2<Long, String>> 
ctx) throws InterruptedException {
-                               int cnt = 
getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
-                               int limit = cnt + elementsPerPartition;
-
-
-                               while (running && cnt < limit) {
-                                       ctx.collect(new Tuple2<>(1000L + cnt, 
"kafka-" + cnt));
-                                       cnt++;
-                                       // we delay data generation a bit so 
that we are sure that some checkpoints are
-                                       // triggered (for FLINK-3156)
-                                       Thread.sleep(50);
-                               }
-                       }
-
-                       @Override
-                       public void cancel() {
-                               running = false;
-                       }
-               });
-               stream.addSink(new 
FlinkKafkaProducer<>(brokerConnectionStrings, topic, sinkSchema));
-
-               // ----------- add consumer dataflow ----------
-
-               List<String> topics = new ArrayList<>();
-               topics.add(topic);
-               topics.add(additionalEmptyTopic);
-               FlinkKafkaConsumer<Tuple2<Long, String>> source = 
getConsumer(topics, sourceSchema, standardProps);
-
-               DataStreamSource<Tuple2<Long, String>> consuming = 
env.addSource(source).setParallelism(parallelism);
-
-               consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
-
-                       private int elCnt = 0;
-                       private BitSet validator = new BitSet(totalElements);
-
-                       @Override
-                       public void invoke(Tuple2<Long, String> value) throws 
Exception {
-                               String[] sp = value.f1.split("-");
-                               int v = Integer.parseInt(sp[1]);
-
-                               assertEquals(value.f0 - 1000, (long) v);
-
-                               assertFalse("Received tuple twice", 
validator.get(v));
-                               validator.set(v);
-                               elCnt++;
-
-                               if (elCnt == totalElements) {
-                                       // check if everything in the bitset is 
set to true
-                                       int nc;
-                                       if ((nc = validator.nextClearBit(0)) != 
totalElements) {
-                                               fail("The bitset was not set to 
1 on all elements. Next clear:"
-                                                               + nc + " Set: " 
+ validator);
-                                       }
-                                       throw new SuccessException();
-                               }
-                       }
-
-                       @Override
-                       public void close() throws Exception {
-                               super.close();
-                       }
-               }).setParallelism(1);
-
-               try {
-                       tryExecutePropagateExceptions(env, 
"runSimpleConcurrentProducerConsumerTopology");
-               }
-               catch (ProgramInvocationException | JobExecutionException e) {
-                       // look for NotLeaderForPartitionException
-                       Throwable cause = e.getCause();
-
-                       // search for nested SuccessExceptions
-                       int depth = 0;
-                       while (cause != null && depth++ < 20) {
-                               if (cause instanceof 
kafka.common.NotLeaderForPartitionException) {
-                                       throw (Exception) cause;
-                               }
-                               cause = cause.getCause();
-                       }
-                       throw e;
-               }
-
-               deleteTestTopic(topic);
-       }
-
-       /**
-        * Tests the proper consumption when having a 1:1 correspondence 
between kafka partitions and
-        * Flink sources.
-        */
-       public void runOneToOneExactlyOnceTest() throws Exception {
-
-               final String topic = "oneToOneTopic";
-               final int parallelism = 5;
-               final int numElementsPerPartition = 1000;
-               final int totalElements = parallelism * numElementsPerPartition;
-               final int failAfterElements = numElementsPerPartition / 3;
-
-               createTestTopic(topic, parallelism, 1);
-
-               DataGenerators.generateRandomizedIntegerSequence(
-                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-                               brokerConnectionStrings,
-                               topic, parallelism, numElementsPerPartition, 
true);
-
-               // run the topology that fails and recovers
-
-               DeserializationSchema<Integer> schema =
-                               new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.enableCheckpointing(500);
-               env.setParallelism(parallelism);
-               env.setNumberOfExecutionRetries(3);
-               env.getConfig().disableSysoutLogging();
-
-               FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, 
schema, standardProps);
-
-               env
-                               .addSource(kafkaSource)
-                               .map(new PartitionValidatingMapper(parallelism, 
1))
-                               .map(new 
FailingIdentityMapper<Integer>(failAfterElements))
-                               .addSink(new 
ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-               FailingIdentityMapper.failedBefore = false;
-               tryExecute(env, "One-to-one exactly once test");
-
-               deleteTestTopic(topic);
-       }
-
-       /**
-        * Tests the proper consumption when having fewer Flink sources than 
Kafka partitions, so
-        * one Flink source will read multiple Kafka partitions.
-        */
-       public void runOneSourceMultiplePartitionsExactlyOnceTest() throws 
Exception {
-               final String topic = "oneToManyTopic";
-               final int numPartitions = 5;
-               final int numElementsPerPartition = 1000;
-               final int totalElements = numPartitions * 
numElementsPerPartition;
-               final int failAfterElements = numElementsPerPartition / 3;
-
-               final int parallelism = 2;
-
-               createTestTopic(topic, numPartitions, 1);
-
-               DataGenerators.generateRandomizedIntegerSequence(
-                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-                               brokerConnectionStrings,
-                               topic, numPartitions, numElementsPerPartition, 
true);
-
-               // run the topology that fails and recovers
-
-               DeserializationSchema<Integer> schema =
-                               new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.enableCheckpointing(500);
-               env.setParallelism(parallelism);
-               env.setNumberOfExecutionRetries(3);
-               env.getConfig().disableSysoutLogging();
-
-               FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, 
schema, standardProps);
-
-               env
-                               .addSource(kafkaSource)
-                               .map(new 
PartitionValidatingMapper(numPartitions, 3))
-                               .map(new 
FailingIdentityMapper<Integer>(failAfterElements))
-                               .addSink(new 
ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-               FailingIdentityMapper.failedBefore = false;
-               tryExecute(env, "One-source-multi-partitions exactly once 
test");
-
-               deleteTestTopic(topic);
-       }
-
-       /**
-        * Tests the proper consumption when having more Flink sources than 
Kafka partitions, which means
-        * that some Flink sources will read no partitions.
-        */
-       public void runMultipleSourcesOnePartitionExactlyOnceTest() throws 
Exception {
-               final String topic = "manyToOneTopic";
-               final int numPartitions = 5;
-               final int numElementsPerPartition = 1000;
-               final int totalElements = numPartitions * 
numElementsPerPartition;
-               final int failAfterElements = numElementsPerPartition / 3;
-
-               final int parallelism = 8;
-
-               createTestTopic(topic, numPartitions, 1);
-
-               DataGenerators.generateRandomizedIntegerSequence(
-                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-                               brokerConnectionStrings,
-                               topic, numPartitions, numElementsPerPartition, 
true);
-
-               // run the topology that fails and recovers
-
-               DeserializationSchema<Integer> schema =
-                               new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.enableCheckpointing(500);
-               env.setParallelism(parallelism);
-               env.setNumberOfExecutionRetries(3);
-               env.getConfig().disableSysoutLogging();
-               env.setBufferTimeout(0);
-
-               FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, 
schema, standardProps);
-
-               env
-                       .addSource(kafkaSource)
-                       .map(new PartitionValidatingMapper(numPartitions, 1))
-                       .map(new 
FailingIdentityMapper<Integer>(failAfterElements))
-                       .addSink(new 
ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-               FailingIdentityMapper.failedBefore = false;
-               tryExecute(env, "multi-source-one-partitions exactly once 
test");
-
-
-               deleteTestTopic(topic);
-       }
-       
-       
-       /**
-        * Tests that the source can be properly canceled when reading full 
partitions. 
-        */
-       public void runCancelingOnFullInputTest() throws Exception {
-               final String topic = "cancelingOnFullTopic";
-
-               final int parallelism = 3;
-               createTestTopic(topic, parallelism, 1);
-
-               // launch a producer thread
-               DataGenerators.InfiniteStringsGenerator generator =
-                               new 
DataGenerators.InfiniteStringsGenerator(brokerConnectionStrings, topic);
-               generator.start();
-
-               // launch a consumer asynchronously
-
-               final AtomicReference<Throwable> jobError = new 
AtomicReference<>();
-
-               final Runnable jobRunner = new Runnable() {
-                       @Override
-                       public void run() {
-                               try {
-                                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-                                       env.setParallelism(parallelism);
-                                       env.enableCheckpointing(100);
-                                       env.getConfig().disableSysoutLogging();
-
-                                       FlinkKafkaConsumer<String> source = 
getConsumer(topic, new SimpleStringSchema(), standardProps);
-
-                                       env.addSource(source).addSink(new 
DiscardingSink<String>());
-
-                                       env.execute();
-                               }
-                               catch (Throwable t) {
-                                       jobError.set(t);
-                               }
-                       }
-               };
-
-               Thread runnerThread = new Thread(jobRunner, "program runner 
thread");
-               runnerThread.start();
-
-               // wait a bit before canceling
-               Thread.sleep(2000);
-
-               // cancel
-               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-
-               // wait for the program to be done and validate that we failed 
with the right exception
-               runnerThread.join();
-
-               Throwable failueCause = jobError.get();
-               assertNotNull("program did not fail properly due to canceling", 
failueCause);
-               assertTrue(failueCause.getMessage().contains("Job was 
cancelled"));
-
-               if (generator.isAlive()) {
-                       generator.shutdown();
-                       generator.join();
-               }
-               else {
-                       Throwable t = generator.getError();
-                       if (t != null) {
-                               t.printStackTrace();
-                               fail("Generator failed: " + t.getMessage());
-                       } else {
-                               fail("Generator failed with no exception");
-                       }
-               }
-
-               deleteTestTopic(topic);
-       }
-
-       /**
-        * Tests that the source can be properly canceled when reading empty 
partitions. 
-        */
-       public void runCancelingOnEmptyInputTest() throws Exception {
-               final String topic = "cancelingOnEmptyInputTopic";
-
-               final int parallelism = 3;
-               createTestTopic(topic, parallelism, 1);
-
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
-
-               final Runnable jobRunner = new Runnable() {
-                       @Override
-                       public void run() {
-                               try {
-                                       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-                                       env.setParallelism(parallelism);
-                                       env.enableCheckpointing(100);
-                                       env.getConfig().disableSysoutLogging();
-
-                                       FlinkKafkaConsumer<String> source = 
getConsumer(topic, new SimpleStringSchema(), standardProps);
-
-                                       env.addSource(source).addSink(new 
DiscardingSink<String>());
-
-                                       env.execute();
-                               }
-                               catch (Throwable t) {
-                                       error.set(t);
-                               }
-                       }
-               };
-
-               Thread runnerThread = new Thread(jobRunner, "program runner 
thread");
-               runnerThread.start();
-
-               // wait a bit before canceling
-               Thread.sleep(2000);
-
-               // cancel
-               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-
-               // wait for the program to be done and validate that we failed 
with the right exception
-               runnerThread.join();
-
-               Throwable failueCause = error.get();
-               assertNotNull("program did not fail properly due to canceling", 
failueCause);
-               assertTrue(failueCause.getMessage().contains("Job was 
cancelled"));
-
-               deleteTestTopic(topic);
-       }
-
-       /**
-        * Tests that the source can be properly canceled when reading full 
partitions. 
-        */
-       public void runFailOnDeployTest() throws Exception {
-               final String topic = "failOnDeployTopic";
-
-               createTestTopic(topic, 2, 1);
-
-               DeserializationSchema<Integer> schema =
-                               new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.setParallelism(12); // needs to be more that the mini 
cluster has slots
-               env.getConfig().disableSysoutLogging();
-
-               FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, 
schema, standardProps);
-
-               env
-                               .addSource(kafkaSource)
-                               .addSink(new DiscardingSink<Integer>());
-
-               try {
-                       env.execute();
-                       fail("this test should fail with an exception");
-               }
-               catch (ProgramInvocationException e) {
-
-                       // validate that we failed due to a 
NoResourceAvailableException
-                       Throwable cause = e.getCause();
-                       int depth = 0;
-                       boolean foundResourceException = false;
-
-                       while (cause != null && depth++ < 20) {
-                               if (cause instanceof 
NoResourceAvailableException) {
-                                       foundResourceException = true;
-                                       break;
-                               }
-                               cause = cause.getCause();
-                       }
-
-                       assertTrue("Wrong exception", foundResourceException);
-               }
-
-               deleteTestTopic(topic);
-       }
-
-       public void runInvalidOffsetTest() throws Exception {
-               final String topic = "invalidOffsetTopic";
-               final int parallelism = 1;
-
-               // create topic
-               createTestTopic(topic, parallelism, 1);
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-
-               // write 20 messages into topic:
-               writeSequence(env, topic, 20, parallelism);
-
-               // set invalid offset:
-               CuratorFramework zkClient = createZookeeperClient();
-               ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, 
standardCC.groupId(), topic, 0, 1234);
-
-               // read from topic
-               final int valuesCount = 20;
-               final int startFrom = 0;
-               readSequence(env, standardCC.props().props(), parallelism, 
topic, valuesCount, startFrom);
-
-               deleteTestTopic(topic);
-       }
-
-       public void runConsumeMultipleTopics() throws java.lang.Exception {
-               final int NUM_TOPICS = 5;
-               final int NUM_ELEMENTS = 20;
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-
-               // create topics with content
-               final List<String> topics = new ArrayList<>();
-               for (int i = 0; i < NUM_TOPICS; i++) {
-                       final String topic = "topic-" + i;
-                       topics.add(topic);
-                       // create topic
-                       createTestTopic(topic, i + 1 /*partitions*/, 1);
-
-                       // write something
-                       writeSequence(env, topic, NUM_ELEMENTS, i + 1);
-               }
-
-               // validate getPartitionsForTopic method
-               List<KafkaTopicPartitionLeader> topicPartitions = 
FlinkKafkaConsumer082.getPartitionsForTopic(topics, standardProps);
-               Assert.assertEquals((NUM_TOPICS * (NUM_TOPICS + 1))/2, 
topicPartitions.size());
-
-               KeyedDeserializationSchema<Tuple3<Integer, Integer, String>> 
readSchema = new Tuple2WithTopicDeserializationSchema(env.getConfig());
-               DataStreamSource<Tuple3<Integer, Integer, String>> stream = 
env.addSource(getConsumer(topics, readSchema, standardProps));
-
-               stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, 
String>, Integer>() {
-                       Map<String, Integer> countPerTopic = new 
HashMap<>(NUM_TOPICS);
-                       @Override
-                       public void flatMap(Tuple3<Integer, Integer, String> 
value, Collector<Integer> out) throws Exception {
-                               Integer count = countPerTopic.get(value.f2);
-                               if (count == null) {
-                                       count = 1;
-                               } else {
-                                       count++;
-                               }
-                               countPerTopic.put(value.f2, count);
-
-                               // check map:
-                               for (Map.Entry<String, Integer> el: 
countPerTopic.entrySet()) {
-                                       if (el.getValue() < NUM_ELEMENTS) {
-                                               break; // not enough yet
-                                       }
-                                       if (el.getValue() > NUM_ELEMENTS) {
-                                               throw new 
RuntimeException("There is a failure in the test. I've read " +
-                                                               el.getValue() + 
" from topic " + el.getKey());
-                                       }
-                               }
-                               // we've seen messages from all topics
-                               throw new SuccessException();
-                       }
-               }).setParallelism(1);
-
-               tryExecute(env, "Count elements from the topics");
-
-
-               // delete all topics again
-               for (int i = 0; i < NUM_TOPICS; i++) {
-                       final String topic = "topic-" + i;
-                       deleteTestTopic(topic);
-               }
-       }
-
-       private static class Tuple2WithTopicDeserializationSchema implements 
KeyedDeserializationSchema<Tuple3<Integer, Integer, String>> {
-
-               TypeSerializer ts;
-               public Tuple2WithTopicDeserializationSchema(ExecutionConfig ec) 
{
-                       ts = TypeInfoParser.parse("Tuple2<Integer, 
Integer>").createSerializer(ec);
-               }
-
-               @Override
-               public Tuple3<Integer, Integer, String> deserialize(byte[] 
messageKey, byte[] message, String topic, long offset) throws IOException {
-                       Tuple2<Integer, Integer> t2 = (Tuple2<Integer, 
Integer>) ts.deserialize(new ByteArrayInputView(message));
-                       return new Tuple3<>(t2.f0, t2.f1, topic);
-               }
-
-               @Override
-               public boolean isEndOfStream(Tuple3<Integer, Integer, String> 
nextElement) {
-                       return false;
-               }
-
-               @Override
-               public TypeInformation<Tuple3<Integer, Integer, String>> 
getProducedType() {
-                       return TypeInfoParser.parse("Tuple3<Integer, Integer, 
String>");
-               }
-       }
-
-       /**
-        * Test Flink's Kafka integration also with very big records (30MB)
-        * see 
http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
-        */
-       public void runBigRecordTestTopology() throws Exception {
-
-               final String topic = "bigRecordTestTopic";
-               final int parallelism = 1; // otherwise, the kafka mini 
clusters may run out of heap space
-
-               createTestTopic(topic, parallelism, 1);
-
-               final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = 
TypeInfoParser.parse("Tuple2<Long, byte[]>");
-
-               final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> 
serSchema =
-                               new 
TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
-
-               final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> 
deserSchema =
-                               new 
TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.setNumberOfExecutionRetries(0);
-               env.getConfig().disableSysoutLogging();
-               env.enableCheckpointing(100);
-               env.setParallelism(parallelism);
-
-               // add consuming topology:
-               Properties consumerProps = new Properties();
-               consumerProps.putAll(standardProps);
-               consumerProps.setProperty("fetch.message.max.bytes", 
Integer.toString(1024 * 1024 * 40));
-               consumerProps.setProperty("max.partition.fetch.bytes", 
Integer.toString(1024 * 1024 * 40)); // for the new fetcher
-               consumerProps.setProperty("queued.max.message.chunks", "1");
-
-               FlinkKafkaConsumer<Tuple2<Long, byte[]>> source = 
getConsumer(topic, serSchema, consumerProps);
-               DataStreamSource<Tuple2<Long, byte[]>> consuming = 
env.addSource(source);
-
-               consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
-
-                       private int elCnt = 0;
-
-                       @Override
-                       public void invoke(Tuple2<Long, byte[]> value) throws 
Exception {
-                               elCnt++;
-                               if (value.f0 == -1) {
-                                       // we should have seen 11 elements now.
-                                       if (elCnt == 11) {
-                                               throw new SuccessException();
-                                       } else {
-                                               throw new 
RuntimeException("There have been "+elCnt+" elements");
-                                       }
-                               }
-                               if (elCnt > 10) {
-                                       throw new RuntimeException("More than 
10 elements seen: "+elCnt);
-                               }
-                       }
-               });
-
-               // add producing topology
-               Properties producerProps = new Properties();
-               producerProps.setProperty("max.request.size", 
Integer.toString(1024 * 1024 * 30));
-               
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerConnectionStrings);
-
-               DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new 
RichSourceFunction<Tuple2<Long, byte[]>>() {
-
-                       private boolean running;
-
-                       @Override
-                       public void open(Configuration parameters) throws 
Exception {
-                               super.open(parameters);
-                               running = true;
-                       }
-
-                       @Override
-                       public void run(SourceContext<Tuple2<Long, byte[]>> 
ctx) throws Exception {
-                               Random rnd = new Random();
-                               long cnt = 0;
-                               int fifteenMb = 1024 * 1024 * 15;
-
-                               while (running) {
-                                       byte[] wl = new byte[fifteenMb + 
rnd.nextInt(fifteenMb)];
-                                       ctx.collect(new Tuple2<>(cnt++, wl));
-
-                                       Thread.sleep(100);
-
-                                       if (cnt == 10) {
-                                               // signal end
-                                               ctx.collect(new Tuple2<>(-1L, 
new byte[]{1}));
-                                               break;
-                                       }
-                               }
-                       }
-
-                       @Override
-                       public void cancel() {
-                               running = false;
-                       }
-               });
-
-               stream.addSink(new FlinkKafkaProducer<>(topic, deserSchema, 
producerProps));
-
-               tryExecute(env, "big topology test");
-
-               deleteTestTopic(topic);
-
-       }
-
-       
-       public void runBrokerFailureTest() throws Exception {
-               final String topic = "brokerFailureTestTopic";
-
-               final int parallelism = 2;
-               final int numElementsPerPartition = 1000;
-               final int totalElements = parallelism * numElementsPerPartition;
-               final int failAfterElements = numElementsPerPartition / 3;
-
-
-               createTestTopic(topic, parallelism, 2);
-
-               DataGenerators.generateRandomizedIntegerSequence(
-                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-                               brokerConnectionStrings,
-                               topic, parallelism, numElementsPerPartition, 
true);
-
-               // find leader to shut down
-               PartitionMetadata firstPart = null;
-               {
-                       ZkClient zkClient = new 
ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-                                       standardCC.zkConnectionTimeoutMs(), new 
ZooKeeperStringSerializer());
-
-                       do {
-                               if (firstPart != null) {
-                                       LOG.info("Unable to find leader. error 
code {}", firstPart.errorCode());
-                                       // not the first try. Sleep a bit
-                                       Thread.sleep(150);
-                               }
-
-                               Seq<PartitionMetadata> partitionMetadata = 
AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
-                               firstPart = partitionMetadata.head();
-                       }
-                       while (firstPart.errorCode() != 0);
-                       zkClient.close();
-               }
-
-               final kafka.cluster.Broker leaderToShutDown = 
firstPart.leader().get();
-               final String leaderToShutDownConnection = 
-                               
NetUtils.hostAndPortToUrlString(leaderToShutDown.host(), 
leaderToShutDown.port());
-               
-               
-               final int leaderIdToShutDown = firstPart.leader().get().id();
-               LOG.info("Leader to shutdown {}", leaderToShutDown);
-
-
-               // run the topology that fails and recovers
-
-               DeserializationSchema<Integer> schema =
-                               new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.setParallelism(parallelism);
-               env.enableCheckpointing(500);
-               env.setNumberOfExecutionRetries(3);
-               env.getConfig().disableSysoutLogging();
-
-
-               FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, 
schema, standardProps);
-
-               env
-                               .addSource(kafkaSource)
-                               .map(new PartitionValidatingMapper(parallelism, 
1))
-                               .map(new 
BrokerKillingMapper<Integer>(leaderToShutDownConnection, failAfterElements))
-                               .addSink(new 
ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-               BrokerKillingMapper.killedLeaderBefore = false;
-               tryExecute(env, "One-to-one exactly once test");
-
-               // start a new broker:
-               brokers.set(leaderIdToShutDown, 
getKafkaServer(leaderIdToShutDown, tmpKafkaDirs.get(leaderIdToShutDown), 
kafkaHost, zookeeperConnectionString));
-
-       }
-
-       public void runKeyValueTest() throws Exception {
-               final String topic = "keyvaluetest";
-               createTestTopic(topic, 1, 1);
-               final int ELEMENT_COUNT = 5000;
-
-               // ----------- Write some data into Kafka -------------------
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.setParallelism(1);
-               env.setNumberOfExecutionRetries(3);
-               env.getConfig().disableSysoutLogging();
-
-               DataStream<Tuple2<Long, PojoValue>> kvStream = 
env.addSource(new SourceFunction<Tuple2<Long, PojoValue>>() {
-                       @Override
-                       public void run(SourceContext<Tuple2<Long, PojoValue>> 
ctx) throws Exception {
-                               Random rnd = new Random(1337);
-                               for (long i = 0; i < ELEMENT_COUNT; i++) {
-                                       PojoValue pojo = new PojoValue();
-                                       pojo.when = new Date(rnd.nextLong());
-                                       pojo.lon = rnd.nextLong();
-                                       pojo.lat = i;
-                                       // make every second key null to ensure 
proper "null" serialization
-                                       Long key = (i % 2 == 0) ? null : i;
-                                       ctx.collect(new Tuple2<>(key, pojo));
-                               }
-                       }
-                       @Override
-                       public void cancel() {
-                       }
-               });
-
-               KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new 
TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, 
env.getConfig());
-               kvStream.addSink(new FlinkKafkaProducer<>(topic, schema,
-                               
FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings)));
-               env.execute("Write KV to Kafka");
-
-               // ----------- Read the data again -------------------
-
-               env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.setParallelism(1);
-               env.setNumberOfExecutionRetries(3);
-               env.getConfig().disableSysoutLogging();
-
-
-               KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema 
= new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, 
env.getConfig());
-
-               DataStream<Tuple2<Long, PojoValue>> fromKafka = 
env.addSource(getConsumer(topic, readSchema, standardProps));
-               fromKafka.flatMap(new 
RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() {
-                       long counter = 0;
-                       @Override
-                       public void flatMap(Tuple2<Long, PojoValue> value, 
Collector<Object> out) throws Exception {
-                               // the elements should be in order.
-                               Assert.assertTrue("Wrong value " + 
value.f1.lat, value.f1.lat == counter );
-                               if (value.f1.lat % 2 == 0) {
-                                       Assert.assertNull("key was not null", 
value.f0);
-                               } else {
-                                       Assert.assertTrue("Wrong value " + 
value.f0, value.f0 == counter);
-                               }
-                               counter++;
-                               if (counter == ELEMENT_COUNT) {
-                                       // we got the right number of elements
-                                       throw new SuccessException();
-                               }
-                       }
-               });
-
-               tryExecute(env, "Read KV from Kafka");
-
-               deleteTestTopic(topic);
-       }
-
-       public static class PojoValue {
-               public Date when;
-               public long lon;
-               public long lat;
-               public PojoValue() {}
-       }
-
-       public void runAllDeletesTest() throws Exception {
-               final String topic = "alldeletestest";
-               createTestTopic(topic, 1, 1);
-               final int ELEMENT_COUNT = 300;
-
-               // ----------- Write some data into Kafka -------------------
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.setParallelism(1);
-               env.setNumberOfExecutionRetries(3);
-               env.getConfig().disableSysoutLogging();
-
-               DataStream<Tuple2<byte[], PojoValue>> kvStream = 
env.addSource(new SourceFunction<Tuple2<byte[], PojoValue>>() {
-                       @Override
-                       public void run(SourceContext<Tuple2<byte[], 
PojoValue>> ctx) throws Exception {
-                               Random rnd = new Random(1337);
-                               for (long i = 0; i < ELEMENT_COUNT; i++) {
-                                       final byte[] key = new byte[200];
-                                       rnd.nextBytes(key);
-                                       ctx.collect(new Tuple2<>(key, 
(PojoValue) null));
-                               }
-                       }
-                       @Override
-                       public void cancel() {
-                       }
-               });
-
-               TypeInformationKeyValueSerializationSchema<byte[], PojoValue> 
schema = new TypeInformationKeyValueSerializationSchema<>(byte[].class, 
PojoValue.class, env.getConfig());
-
-               kvStream.addSink(new FlinkKafkaProducer<>(topic, schema,
-                               
FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings)));
-               env.execute("Write deletes to Kafka");
-
-               // ----------- Read the data again -------------------
-
-               env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.setParallelism(1);
-               env.setNumberOfExecutionRetries(3);
-               env.getConfig().disableSysoutLogging();
-
-               DataStream<Tuple2<byte[], PojoValue>> fromKafka = 
env.addSource(getConsumer(topic, schema, standardProps));
-
-               fromKafka.flatMap(new RichFlatMapFunction<Tuple2<byte[], 
PojoValue>, Object>() {
-                       long counter = 0;
-                       @Override
-                       public void flatMap(Tuple2<byte[], PojoValue> value, 
Collector<Object> out) throws Exception {
-                               // ensure that deleted messages are passed as 
nulls
-                               assertNull(value.f1);
-                               counter++;
-                               if (counter == ELEMENT_COUNT) {
-                                       // we got the right number of elements
-                                       throw new SuccessException();
-                               }
-                       }
-               });
-
-               tryExecute(env, "Read deletes from Kafka");
-
-               deleteTestTopic(topic);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Reading writing test data sets
-       // 
------------------------------------------------------------------------
-
-       private void readSequence(StreamExecutionEnvironment env, Properties cc,
-                                                               final int 
sourceParallelism,
-                                                               final String 
topicName,
-                                                               final int 
valuesCount, final int startFrom) throws Exception {
-
-               final int finalCount = valuesCount * sourceParallelism;
-
-               final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType 
= TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-               final TypeInformationSerializationSchema<Tuple2<Integer, 
Integer>> deser =
-                               new 
TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
-
-               // create the consumer
-               FlinkKafkaConsumer<Tuple2<Integer, Integer>> consumer = 
getConsumer(topicName, deser, cc);
-
-               DataStream<Tuple2<Integer, Integer>> source = env
-                               
.addSource(consumer).setParallelism(sourceParallelism)
-                               .map(new ThrottledMapper<Tuple2<Integer, 
Integer>>(20)).setParallelism(sourceParallelism);
-
-               // verify data
-               source.flatMap(new RichFlatMapFunction<Tuple2<Integer, 
Integer>, Integer>() {
-
-                       private int[] values = new int[valuesCount];
-                       private int count = 0;
-
-                       @Override
-                       public void flatMap(Tuple2<Integer, Integer> value, 
Collector<Integer> out) throws Exception {
-                               values[value.f1 - startFrom]++;
-                               count++;
-
-                               // verify if we've seen everything
-                               if (count == finalCount) {
-                                       for (int i = 0; i < values.length; i++) 
{
-                                               int v = values[i];
-                                               if (v != sourceParallelism) {
-                                                       printTopic(topicName, 
valuesCount, deser);
-                                                       throw new 
RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " 
array=" + Arrays.toString(values));
-                                               }
-                                       }
-                                       // test has passed
-                                       throw new SuccessException();
-                               }
-                       }
-
-               }).setParallelism(1);
-
-               tryExecute(env, "Read data from Kafka");
-
-               LOG.info("Successfully read sequence for verification");
-       }
-
-       private static void writeSequence(StreamExecutionEnvironment env, 
String topicName, final int numElements, int parallelism) throws Exception {
-
-               LOG.info("\n===================================\n== Writing 
sequence of "+numElements+" into "+topicName+" with 
p="+parallelism+"\n===================================");
-               TypeInformation<Tuple2<Integer, Integer>> resultType = 
TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-               DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new 
RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
-                       private boolean running = true;
-
-                       @Override
-                       public void run(SourceContext<Tuple2<Integer, Integer>> 
ctx) throws Exception {
-                               int cnt = 0;
-                               int partition = 
getRuntimeContext().getIndexOfThisSubtask();
-
-                               while (running && cnt < numElements) {
-                                       ctx.collect(new Tuple2<>(partition, 
cnt));
-                                       cnt++;
-                               }
-                       }
-
-                       @Override
-                       public void cancel() {
-                               running = false;
-                       }
-               }).setParallelism(parallelism);
-               
-               stream.addSink(new FlinkKafkaProducer<>(topicName,
-                               new 
TypeInformationSerializationSchema<>(resultType, env.getConfig()),
-                               
FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings),
-                               new Tuple2Partitioner(parallelism)
-               )).setParallelism(parallelism);
-
-               env.execute("Write sequence");
-
-               LOG.info("Finished writing sequence");
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Debugging utilities
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Read topic to list, only using Kafka code.
-        */
-       private static List<MessageAndMetadata<byte[], byte[]>> 
readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
-               ConsumerConnector consumerConnector = 
Consumer.createJavaConsumerConnector(config);
-               // we request only one stream per consumer instance. Kafka will 
make sure that each consumer group
-               // will see each message only once.
-               Map<String,Integer> topicCountMap = 
Collections.singletonMap(topicName, 1);
-               Map<String, List<KafkaStream<byte[], byte[]>>> streams = 
consumerConnector.createMessageStreams(topicCountMap);
-               if (streams.size() != 1) {
-                       throw new RuntimeException("Expected only one message 
stream but got "+streams.size());
-               }
-               List<KafkaStream<byte[], byte[]>> kafkaStreams = 
streams.get(topicName);
-               if (kafkaStreams == null) {
-                       throw new RuntimeException("Requested stream not 
available. Available streams: "+streams.toString());
-               }
-               if (kafkaStreams.size() != 1) {
-                       throw new RuntimeException("Requested 1 stream from 
Kafka, bot got "+kafkaStreams.size()+" streams");
-               }
-               LOG.info("Opening Consumer instance for topic '{}' on group 
'{}'", topicName, config.groupId());
-               ConsumerIterator<byte[], byte[]> iteratorToRead = 
kafkaStreams.get(0).iterator();
-
-               List<MessageAndMetadata<byte[], byte[]>> result = new 
ArrayList<>();
-               int read = 0;
-               while(iteratorToRead.hasNext()) {
-                       read++;
-                       result.add(iteratorToRead.next());
-                       if (read == stopAfter) {
-                               LOG.info("Read "+read+" elements");
-                               return result;
-                       }
-               }
-               return result;
-       }
-
-       private static void printTopic(String topicName, ConsumerConfig config,
-                                                               
DeserializationSchema<?> deserializationSchema,
-                                                               int stopAfter) 
throws IOException {
-
-               List<MessageAndMetadata<byte[], byte[]>> contents = 
readTopicToList(topicName, config, stopAfter);
-               LOG.info("Printing contents of topic {} in consumer grouo {}", 
topicName, config.groupId());
-
-               for (MessageAndMetadata<byte[], byte[]> message: contents) {
-                       Object out = 
deserializationSchema.deserialize(message.message());
-                       LOG.info("Message: partition: {} offset: {} msg: {}", 
message.partition(), message.offset(), out.toString());
-               }
-       }
-
-       private static void printTopic(String topicName, int 
elements,DeserializationSchema<?> deserializer) 
-                       throws IOException
-       {
-               // write the sequence to log for debugging purposes
-               Properties stdProps = standardCC.props().props();
-               Properties newProps = new Properties(stdProps);
-               newProps.setProperty("group.id", "topic-printer"+ 
UUID.randomUUID().toString());
-               newProps.setProperty("auto.offset.reset", "smallest");
-               newProps.setProperty("zookeeper.connect", 
standardCC.zkConnect());
-
-               ConsumerConfig printerConfig = new ConsumerConfig(newProps);
-               printTopic(topicName, printerConfig, deserializer, elements);
-       }
-
-
-       public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
-                       implements Checkpointed<Integer>, CheckpointNotifier {
-
-               private static final long serialVersionUID = 
6334389850158707313L;
-
-               public static volatile boolean killedLeaderBefore;
-               public static volatile boolean hasBeenCheckpointedBeforeFailure;
-               
-               private final String leaderToShutDown;
-               private final int failCount;
-               private int numElementsTotal;
-
-               private boolean failer;
-               private boolean hasBeenCheckpointed;
-
-
-               public BrokerKillingMapper(String leaderToShutDown, int 
failCount) {
-                       this.leaderToShutDown = leaderToShutDown;
-                       this.failCount = failCount;
-               }
-
-               @Override
-               public void open(Configuration parameters) {
-                       failer = getRuntimeContext().getIndexOfThisSubtask() == 
0;
-               }
-
-               @Override
-               public T map(T value) throws Exception {
-                       numElementsTotal++;
-                       
-                       if (!killedLeaderBefore) {
-                               Thread.sleep(10);
-                               
-                               if (failer && numElementsTotal >= failCount) {
-                                       // shut down a Kafka broker
-                                       KafkaServer toShutDown = null;
-                                       for (KafkaServer kafkaServer : brokers) 
{
-                                               String connectionUrl = 
-                                                               
NetUtils.hostAndPortToUrlString(
-                                                                               
kafkaServer.config().advertisedHostName(),
-                                                                               
kafkaServer.config().advertisedPort());
-                                               if 
(leaderToShutDown.equals(connectionUrl)) {
-                                                       toShutDown = 
kafkaServer;
-                                                       break;
-                                               }
-                                       }
-       
-                                       if (toShutDown == null) {
-                                               StringBuilder listOfBrokers = 
new StringBuilder();
-                                               for (KafkaServer kafkaServer : 
brokers) {
-                                                       listOfBrokers.append(
-                                                                       
NetUtils.hostAndPortToUrlString(
-                                                                               
        kafkaServer.config().advertisedHostName(),
-                                                                               
        kafkaServer.config().advertisedPort()));
-                                                       listOfBrokers.append(" 
; ");
-                                               }
-                                               
-                                               throw new Exception("Cannot 
find broker to shut down: " + leaderToShutDown
-                                                               + " ; available 
brokers: " + listOfBrokers.toString());
-                                       }
-                                       else {
-                                               
hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
-                                               killedLeaderBefore = true;
-                                               toShutDown.shutdown();
-                                       }
-                               }
-                       }
-                       return value;
-               }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) {
-                       hasBeenCheckpointed = true;
-               }
-
-               @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return numElementsTotal;
-               }
-
-               @Override
-               public void restoreState(Integer state) {
-                       this.numElementsTotal = state;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
deleted file mode 100644
index 07e650a..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Properties;
-
-
-public class KafkaITCase extends KafkaConsumerTestBase {
-       
-       @Override
-       protected <T> FlinkKafkaConsumer<T> getConsumer(List<String> topics, 
KeyedDeserializationSchema<T> deserializationSchema, Properties props) {
-               return new FlinkKafkaConsumer082<>(topics, 
deserializationSchema, props);
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Suite of Tests
-       // 
------------------------------------------------------------------------
-       
-       @Test
-       public void testCheckpointing() throws Exception {
-               runCheckpointingTest();
-       }
-
-       @Test()
-       public void testFailOnNoBroker() throws Exception {
-               runFailOnNoBrokerTest();
-       }
-
-       @Test
-       public void testOffsetInZookeeper() throws Exception {
-               runOffsetInZookeeperValidationTest();
-       }
-
-       @Test
-       public void testOffsetAutocommitTest() throws Exception {
-               runOffsetAutocommitTest();
-       }
-
-
-       @Test
-       public void testConcurrentProducerConsumerTopology() throws Exception {
-               runSimpleConcurrentProducerConsumerTopology();
-       }
-
-       @Test(timeout = 60000)
-       public void testKeyValueSupport() throws Exception {
-               runKeyValueTest();
-       }
-
-       // --- canceling / failures ---
-       
-       @Test
-       public void testCancelingEmptyTopic() throws Exception {
-               runCancelingOnEmptyInputTest();
-       }
-
-       @Test
-       public void testCancelingFullTopic() throws Exception {
-               runCancelingOnFullInputTest();
-       }
-
-       @Test
-       public void testFailOnDeploy() throws Exception {
-               runFailOnDeployTest();
-       }
-
-       @Test
-       public void testInvalidOffset() throws Exception {
-               runInvalidOffsetTest();
-       }
-
-       // --- source to partition mappings and exactly once ---
-       
-       @Test
-       public void testOneToOneSources() throws Exception {
-               runOneToOneExactlyOnceTest();
-       }
-
-       @Test
-       public void testOneSourceMultiplePartitions() throws Exception {
-               runOneSourceMultiplePartitionsExactlyOnceTest();
-       }
-
-       @Test
-       public void testMultipleSourcesOnePartition() throws Exception {
-               runMultipleSourcesOnePartitionExactlyOnceTest();
-       }
-
-       // --- broker failure ---
-
-       @Test
-       public void testBrokerFailure() throws Exception {
-               runBrokerFailureTest();
-       }
-
-       // --- special executions ---
-       
-       @Test
-       public void testBigRecordJob() throws Exception {
-               runBigRecordTestTopology();
-       }
-
-       @Test
-       public void testMultipleTopics() throws Exception {
-               runConsumeMultipleTopics();
-       }
-
-       @Test
-       public void testAllDeletes() throws Exception {
-               runAllDeletesTest();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
deleted file mode 100644
index 72d2772..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaLocalSystemTime implements Time {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaLocalSystemTime.class);
-
-       @Override
-       public long milliseconds() {
-               return System.currentTimeMillis();
-       }
-
-       @Override
-       public long nanoseconds() {
-               return System.nanoTime();
-       }
-
-       @Override
-       public void sleep(long ms) {
-               try {
-                       Thread.sleep(ms);
-               } catch (InterruptedException e) {
-                       LOG.warn("Interruption", e);
-               }
-       }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
deleted file mode 100644
index f4c1899..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
-import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings("serial")
-public class KafkaProducerITCase extends KafkaTestBase {
-
-
-       /**
-        * 
-        * <pre>
-        *             +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) 
--+
-        *            /                  |                                      
 \
-        *           /                   |                                      
  \
-        * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) 
-----+-> (sink)
-        *           \                   |                                      
  /
-        *            \                  |                                      
 /
-        *             +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) 
--+
-        * </pre>
-        * 
-        * The mapper validates that the values come consistently from the 
correct Kafka partition.
-        * 
-        * The final sink validates that there are no duplicates and that all 
partitions are present.
-        */
-       @Test
-       public void testCustomPartitioning() {
-               try {
-                       LOG.info("Starting 
KafkaProducerITCase.testCustomPartitioning()");
-
-                       final String topic = "customPartitioningTestTopic";
-                       final int parallelism = 3;
-                       
-                       createTestTopic(topic, parallelism, 1);
-
-                       TypeInformation<Tuple2<Long, String>> longStringInfo = 
TypeInfoParser.parse("Tuple2<Long, String>");
-
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-                       env.setNumberOfExecutionRetries(0);
-                       env.getConfig().disableSysoutLogging();
-
-                       TypeInformationSerializationSchema<Tuple2<Long, 
String>> serSchema =
-                                       new 
TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
-
-                       TypeInformationSerializationSchema<Tuple2<Long, 
String>> deserSchema =
-                                       new 
TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
-
-                       // ------ producing topology ---------
-                       
-                       // source has DOP 1 to make sure it generates no 
duplicates
-                       DataStream<Tuple2<Long, String>> stream = 
env.addSource(new SourceFunction<Tuple2<Long, String>>() {
-
-                               private boolean running = true;
-
-                               @Override
-                               public void run(SourceContext<Tuple2<Long, 
String>> ctx) throws Exception {
-                                       long cnt = 0;
-                                       while (running) {
-                                               ctx.collect(new Tuple2<Long, 
String>(cnt, "kafka-" + cnt));
-                                               cnt++;
-                                       }
-                               }
-
-                               @Override
-                               public void cancel() {
-                                       running = false;
-                               }
-                       })
-                       .setParallelism(1);
-                       
-                       // sink partitions into 
-                       stream.addSink(new FlinkKafkaProducer<>(topic, 
serSchema, 
FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings), new 
CustomPartitioner(parallelism)))
-                       .setParallelism(parallelism);
-
-                       // ------ consuming topology ---------
-                       
-                       FlinkKafkaConsumer<Tuple2<Long, String>> source = 
-                                       new 
FlinkKafkaConsumer<>(Collections.singletonList(topic), deserSchema, 
standardProps,
-                                                       
FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
-                                                       
FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
-                       
-                       env.addSource(source).setParallelism(parallelism)
-
-                                       // mapper that validates partitioning 
and maps to partition
-                                       .map(new RichMapFunction<Tuple2<Long, 
String>, Integer>() {
-                                               
-                                               private int ourPartition = -1;
-                                               @Override
-                                               public Integer map(Tuple2<Long, 
String> value) {
-                                                       int partition = 
value.f0.intValue() % parallelism;
-                                                       if (ourPartition != -1) 
{
-                                                               
assertEquals("inconsistent partitioning", ourPartition, partition);
-                                                       } else {
-                                                               ourPartition = 
partition;
-                                                       }
-                                                       return partition;
-                                               }
-                                       }).setParallelism(parallelism)
-                                       
-                                       .addSink(new SinkFunction<Integer>() {
-                                               
-                                               private int[] 
valuesPerPartition = new int[parallelism];
-                                               
-                                               @Override
-                                               public void invoke(Integer 
value) throws Exception {
-                                                       
valuesPerPartition[value]++;
-                                                       
-                                                       boolean missing = false;
-                                                       for (int i : 
valuesPerPartition) {
-                                                               if (i < 100) {
-                                                                       missing 
= true;
-                                                                       break;
-                                                               }
-                                                       }
-                                                       if (!missing) {
-                                                               throw new 
SuccessException();
-                                                       }
-                                               }
-                                       }).setParallelism(1);
-                       
-                       tryExecute(env, "custom partitioning test");
-
-                       deleteTestTopic(topic);
-                       
-                       LOG.info("Finished 
KafkaProducerITCase.testCustomPartitioning()");
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       
-       // 
------------------------------------------------------------------------
-
-       public static class CustomPartitioner extends KafkaPartitioner 
implements Serializable {
-
-               private final int expectedPartitions;
-
-               public CustomPartitioner(int expectedPartitions) {
-                       this.expectedPartitions = expectedPartitions;
-               }
-
-               @Override
-               public int partition(Object key, int numPartitions) {
-                       @SuppressWarnings("unchecked")
-                       Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
-                       
-                       assertEquals(expectedPartitions, numPartitions);
-                       
-                       return (int) (tuple.f0 % numPartitions);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
deleted file mode 100644
index 531b219..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
-
-import static org.mockito.Mockito.*;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-import static org.junit.Assert.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FlinkKafkaProducer.class)
-public class KafkaProducerTest extends TestLogger {
-       
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testPropagateExceptions() {
-               try {
-                       // mock kafka producer
-                       KafkaProducer<?, ?> kafkaProducerMock = 
mock(KafkaProducer.class);
-                       
-                       // partition setup
-                       
when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-                                       Arrays.asList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
-
-                       // failure when trying to send an element
-                       when(kafkaProducerMock.send(any(ProducerRecord.class), 
any(Callback.class)))
-                               .thenAnswer(new 
Answer<Future<RecordMetadata>>() {
-                                       @Override
-                                       public Future<RecordMetadata> 
answer(InvocationOnMock invocation) throws Throwable {
-                                               Callback callback = (Callback) 
invocation.getArguments()[1];
-                                               callback.onCompletion(null, new 
Exception("Test error"));
-                                               return null;
-                                       }
-                               });
-                       
-                       // make sure the FlinkKafkaProducer instantiates our 
mock producer
-                       
whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
-                       
-                       // (1) producer that propagates errors
-                       
-                       FlinkKafkaProducer<String> producerPropagating = new 
FlinkKafkaProducer<String>(
-                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
-
-                       producerPropagating.setRuntimeContext(new 
MockRuntimeContext(17, 3));
-                       producerPropagating.open(new Configuration());
-                       
-                       try {
-                               producerPropagating.invoke("value");
-                               producerPropagating.invoke("value");
-                               fail("This should fail with an exception");
-                       }
-                       catch (Exception e) {
-                               assertNotNull(e.getCause());
-                               assertNotNull(e.getCause().getMessage());
-                               
assertTrue(e.getCause().getMessage().contains("Test error"));
-                       }
-
-                       // (2) producer that only logs errors
-                       
-                       FlinkKafkaProducer<String> producerLogging = new 
FlinkKafkaProducer<String>(
-                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
-                       producerLogging.setLogFailuresOnly(true);
-                       
-                       producerLogging.setRuntimeContext(new 
MockRuntimeContext(17, 3));
-                       producerLogging.open(new Configuration());
-
-                       producerLogging.invoke("value");
-                       producerLogging.invoke("value");
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
deleted file mode 100644
index c9ac75b..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.common.KafkaException;
-import kafka.consumer.ConsumerConfig;
-import kafka.network.SocketServer;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-
-import org.I0Itec.zkclient.ZkClient;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
-import 
org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.BindException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * The base for the Kafka tests. It brings up:
- * <ul>
- *     <li>A ZooKeeper mini cluster</li>
- *     <li>Three Kafka Brokers (mini clusters)</li>
- *     <li>A Flink mini cluster</li>
- * </ul>
- * 
- * <p>Code in this test is based on the following GitHub repository:
- * <a href="https://github.com/sakserv/hadoop-mini-clusters";>
- *   https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
- * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
- */
-@SuppressWarnings("serial")
-public abstract class KafkaTestBase extends TestLogger {
-
-       protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestBase.class);
-       
-       protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
-
-       protected static String zookeeperConnectionString;
-
-       protected static File tmpZkDir;
-
-       protected static File tmpKafkaParent;
-
-       protected static TestingServer zookeeper;
-       protected static List<KafkaServer> brokers;
-       protected static String brokerConnectionStrings = "";
-
-       protected static ConsumerConfig standardCC;
-       protected static Properties standardProps;
-       
-       protected static ForkableFlinkMiniCluster flink;
-
-       protected static int flinkPort;
-
-       protected static FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
-
-       protected static List<File> tmpKafkaDirs;
-
-       protected static String kafkaHost = "localhost";
-
-       // 
------------------------------------------------------------------------
-       //  Setup and teardown of the mini clusters
-       // 
------------------------------------------------------------------------
-       
-       @BeforeClass
-       public static void prepare() throws IOException {
-               
LOG.info("-------------------------------------------------------------------------");
-               LOG.info("    Starting KafkaITCase ");
-               
LOG.info("-------------------------------------------------------------------------");
-               
-               LOG.info("Starting KafkaITCase.prepare()");
-               
-               File tempDir = new File(System.getProperty("java.io.tmpdir"));
-               
-               tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
-               assertTrue("cannot create zookeeper temp dir", 
tmpZkDir.mkdirs());
-
-               tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + 
(UUID.randomUUID().toString()));
-               assertTrue("cannot create kafka temp dir", 
tmpKafkaParent.mkdirs());
-
-               tmpKafkaDirs = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
-               for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
-                       File tmpDir = new File(tmpKafkaParent, "server-" + i);
-                       assertTrue("cannot create kafka temp dir", 
tmpDir.mkdir());
-                       tmpKafkaDirs.add(tmpDir);
-               }
-               
-               zookeeper = null;
-               brokers = null;
-
-               try {
-                       LOG.info("Starting Zookeeper");
-                       zookeeper = new TestingServer(-1, tmpZkDir);
-                       zookeeperConnectionString = 
zookeeper.getConnectString();
-
-                       LOG.info("Starting KafkaServer");
-                       brokers = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
-                       
-                       for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
-                               brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i), kafkaHost, zookeeperConnectionString));
-                               SocketServer socketServer = 
brokers.get(i).socketServer();
-                               
-                               String host = socketServer.host() == null ? 
"localhost" : socketServer.host();
-                               brokerConnectionStrings += 
hostAndPortToUrlString(host, socketServer.port()) + ",";
-                       }
-
-                       LOG.info("ZK and KafkaServer started.");
-               }
-               catch (Throwable t) {
-                       t.printStackTrace();
-                       fail("Test setup failed: " + t.getMessage());
-               }
-
-               standardProps = new Properties();
-
-               standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
-               standardProps.setProperty("bootstrap.servers", 
brokerConnectionStrings);
-               standardProps.setProperty("group.id", "flink-tests");
-               standardProps.setProperty("auto.commit.enable", "false");
-               standardProps.setProperty("zookeeper.session.timeout.ms", 
"12000"); // 6 seconds is default. Seems to be too small for travis.
-               standardProps.setProperty("zookeeper.connection.timeout.ms", 
"20000");
-               standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning.
-               standardProps.setProperty("fetch.message.max.bytes", "256"); // 
make a lot of fetches (MESSAGES MUST BE SMALLER!)
-               
-               Properties consumerConfigProps = new Properties();
-               consumerConfigProps.putAll(standardProps);
-               consumerConfigProps.setProperty("auto.offset.reset", 
"smallest");
-               standardCC = new ConsumerConfig(consumerConfigProps);
-               
-               // start also a re-usable Flink mini cluster
-               
-               Configuration flinkConfig = new Configuration();
-               
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
-               
flinkConfig.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "0 s");
-
-               flink = new ForkableFlinkMiniCluster(flinkConfig, false);
-               flink.start();
-
-               flinkPort = flink.getLeaderRPCPort();
-       }
-
-       @AfterClass
-       public static void shutDownServices() {
-
-               
LOG.info("-------------------------------------------------------------------------");
-               LOG.info("    Shut down KafkaITCase ");
-               
LOG.info("-------------------------------------------------------------------------");
-
-               flinkPort = -1;
-               if (flink != null) {
-                       flink.shutdown();
-               }
-               
-               for (KafkaServer broker : brokers) {
-                       if (broker != null) {
-                               broker.shutdown();
-                       }
-               }
-               brokers.clear();
-               
-               if (zookeeper != null) {
-                       try {
-                               zookeeper.stop();
-                       }
-                       catch (Exception e) {
-                               LOG.warn("ZK.stop() failed", e);
-                       }
-                       zookeeper = null;
-               }
-               
-               // clean up the temp spaces
-               
-               if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
-                       try {
-                               FileUtils.deleteDirectory(tmpKafkaParent);
-                       }
-                       catch (Exception e) {
-                               // ignore
-                       }
-               }
-               if (tmpZkDir != null && tmpZkDir.exists()) {
-                       try {
-                               FileUtils.deleteDirectory(tmpZkDir);
-                       }
-                       catch (Exception e) {
-                               // ignore
-                       }
-               }
-
-               
LOG.info("-------------------------------------------------------------------------");
-               LOG.info("    KafkaITCase finished"); 
-               
LOG.info("-------------------------------------------------------------------------");
-       }
-
-       /**
-        * Copied from 
com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
-        */
-       protected static KafkaServer getKafkaServer(int brokerId, File 
tmpFolder,
-                                                                               
                String kafkaHost,
-                                                                               
                String zookeeperConnectionString) throws Exception {
-               Properties kafkaProperties = new Properties();
-
-               // properties have to be Strings
-               kafkaProperties.put("advertised.host.name", kafkaHost);
-               kafkaProperties.put("broker.id", Integer.toString(brokerId));
-               kafkaProperties.put("log.dir", tmpFolder.toString());
-               kafkaProperties.put("zookeeper.connect", 
zookeeperConnectionString);
-               kafkaProperties.put("message.max.bytes", String.valueOf(50 * 
1024 * 1024));
-               kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
-               
-               // for CI stability, increase zookeeper session timeout
-               kafkaProperties.put("zookeeper.session.timeout.ms", "20000");
-
-               final int numTries = 5;
-               
-               for (int i = 1; i <= numTries; i++) { 
-                       int kafkaPort = NetUtils.getAvailablePort();
-                       kafkaProperties.put("port", 
Integer.toString(kafkaPort));
-                       KafkaConfig kafkaConfig = new 
KafkaConfig(kafkaProperties);
-
-                       try {
-                               KafkaServer server = new 
KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
-                               server.startup();
-                               return server;
-                       }
-                       catch (KafkaException e) {
-                               if (e.getCause() instanceof BindException) {
-                                       // port conflict, retry...
-                                       LOG.info("Port conflict when starting 
Kafka Broker. Retrying...");
-                               }
-                               else {
-                                       throw e;
-                               }
-                       }
-               }
-               
-               throw new Exception("Could not start Kafka after " + numTries + 
" retries due to port conflicts.");
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Execution utilities
-       // 
------------------------------------------------------------------------
-       
-       protected CuratorFramework createZookeeperClient() {
-               RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
-               CuratorFramework curatorClient = 
CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"),
 retryPolicy);
-               curatorClient.start();
-               return curatorClient;
-       }
-       
-       protected static void tryExecute(StreamExecutionEnvironment see, String 
name) throws Exception {
-               try {
-                       see.execute(name);
-               }
-               catch (ProgramInvocationException | JobExecutionException root) 
{
-                       Throwable cause = root.getCause();
-                       
-                       // search for nested SuccessExceptions
-                       int depth = 0;
-                       while (!(cause instanceof SuccessException)) {
-                               if (cause == null || depth++ == 20) {
-                                       root.printStackTrace();
-                                       fail("Test failed: " + 
root.getMessage());
-                               }
-                               else {
-                                       cause = cause.getCause();
-                               }
-                       }
-               }
-       }
-
-       protected static void 
tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) 
throws Exception {
-               try {
-                       see.execute(name);
-               }
-               catch (ProgramInvocationException | JobExecutionException root) 
{
-                       Throwable cause = root.getCause();
-
-                       // search for nested SuccessExceptions
-                       int depth = 0;
-                       while (!(cause instanceof SuccessException)) {
-                               if (cause == null || depth++ == 20) {
-                                       throw root;
-                               }
-                               else {
-                                       cause = cause.getCause();
-                               }
-                       }
-               }
-       }
-       
-       
-
-       protected static void createTestTopic(String topic, int 
numberOfPartitions, int replicationFactor) {
-               
-               // create topic with one client
-               Properties topicConfig = new Properties();
-               LOG.info("Creating topic {}", topic);
-
-               ZkClient creator = new ZkClient(standardCC.zkConnect(), 
standardCC.zkSessionTimeoutMs(),
-                               standardCC.zkConnectionTimeoutMs(), new 
ZooKeeperStringSerializer());
-               
-               AdminUtils.createTopic(creator, topic, numberOfPartitions, 
replicationFactor, topicConfig);
-               creator.close();
-               
-               // validate that the topic has been created
-               final long deadline = System.currentTimeMillis() + 30000;
-               do {
-                       try {
-                               Thread.sleep(100);
-                       }
-                       catch (InterruptedException e) {
-                               // restore interrupted state
-                       }
-                       List<KafkaTopicPartitionLeader> partitions = 
FlinkKafkaConsumer.getPartitionsForTopic(Collections.singletonList(topic), 
standardProps);
-                       if (partitions != null && partitions.size() > 0) {
-                               return;
-                       }
-               }
-               while (System.currentTimeMillis() < deadline);
-               fail ("Test topic could not be created");
-       }
-       
-       protected static void deleteTestTopic(String topic) {
-               LOG.info("Deleting topic {}", topic);
-
-               ZkClient zk = new ZkClient(standardCC.zkConnect(), 
standardCC.zkSessionTimeoutMs(),
-                               standardCC.zkConnectionTimeoutMs(), new 
ZooKeeperStringSerializer());
-
-               AdminUtils.deleteTopic(zk, topic);
-               
-               zk.close();
-       }
-}

Reply via email to