Repository: flink
Updated Branches:
  refs/heads/master 83fb2fa89 -> 81320c1c7


http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
deleted file mode 100644
index 75fdd46..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Assert;
-import org.junit.Test;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-
-public class TestFixedPartitioner {
-
-
-       /**
-        * <pre>
-        *              Flink Sinks:            Kafka Partitions
-        *                      1       ---------------->       1
-        *                      2   --------------/
-        *                      3   -------------/
-        *                      4       ------------/
-        * </pre>
-        */
-       @Test
-       public void testMoreFlinkThanBrokers() {
-               FixedPartitioner part = new FixedPartitioner();
-
-               int[] partitions = new int[]{0};
-
-               part.open(0, 4, partitions);
-               Assert.assertEquals(0, part.partition("abc1", 
partitions.length));
-
-               part.open(1, 4, partitions);
-               Assert.assertEquals(0, part.partition("abc2", 
partitions.length));
-
-               part.open(2, 4, partitions);
-               Assert.assertEquals(0, part.partition("abc3", 
partitions.length));
-               Assert.assertEquals(0, part.partition("abc3", 
partitions.length)); // check if it is changing ;)
-
-               part.open(3, 4, partitions);
-               Assert.assertEquals(0, part.partition("abc4", 
partitions.length));
-       }
-
-       /**
-        *
-        * <pre>
-        *              Flink Sinks:            Kafka Partitions
-        *                      1       ---------------->       1
-        *                      2       ---------------->       2
-        *                                                                      
3
-        *                                                                      
4
-        *                                                                      
5
-        *
-        * </pre>
-        */
-       @Test
-       public void testFewerPartitions() {
-               FixedPartitioner part = new FixedPartitioner();
-
-               int[] partitions = new int[]{0, 1, 2, 3, 4};
-               part.open(0, 2, partitions);
-               Assert.assertEquals(0, part.partition("abc1", 
partitions.length));
-               Assert.assertEquals(0, part.partition("abc1", 
partitions.length));
-
-               part.open(1, 2, partitions);
-               Assert.assertEquals(1, part.partition("abc1", 
partitions.length));
-               Assert.assertEquals(1, part.partition("abc1", 
partitions.length));
-       }
-
-       /*
-        *              Flink Sinks:            Kafka Partitions
-        *                      1       ------------>--->       1
-        *                      2       -----------/---->       2
-        *                      3       ----------/
-        */
-       @Test
-       public void testMixedCase() {
-               FixedPartitioner part = new FixedPartitioner();
-               int[] partitions = new int[]{0,1};
-
-               part.open(0, 3, partitions);
-               Assert.assertEquals(0, part.partition("abc1", 
partitions.length));
-
-               part.open(1, 3, partitions);
-               Assert.assertEquals(1, part.partition("abc1", 
partitions.length));
-
-               part.open(2, 3, partitions);
-               Assert.assertEquals(0, part.partition("abc1", 
partitions.length));
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
deleted file mode 100644
index 8d16da0..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.admin.AdminUtils;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
-
-import 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class ZookeeperOffsetHandlerTest extends KafkaTestBase {
-       
-       @Test
-       public void runOffsetManipulationinZooKeeperTest() {
-               try {
-                       final String topicName = 
"ZookeeperOffsetHandlerTest-Topic";
-                       final String groupId = 
"ZookeeperOffsetHandlerTest-Group";
-                       
-                       final long offset = (long) (Math.random() * 
Long.MAX_VALUE);
-
-                       CuratorFramework curatorFramework = 
createZookeeperClient();
-
-                       {
-                               ZkClient zkClient = new 
ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-                                               
standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-                               AdminUtils.createTopic(zkClient, topicName, 3, 
2, new Properties());
-                               zkClient.close();
-                       }
-                               
-                       
ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, 
topicName, 0, offset);
-       
-                       long fetchedOffset = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, 
topicName, 0);
-
-                       curatorFramework.close();
-                       
-                       assertEquals(offset, fetchedOffset);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
deleted file mode 100644
index 22c6cfb..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import java.util.Random;
-
-@SuppressWarnings("serial")
-public class DataGenerators {
-       
-       public static void 
generateLongStringTupleSequence(StreamExecutionEnvironment env,
-                                                                               
                                String brokerConnection, String topic,
-                                                                               
                                int numPartitions,
-                                                                               
                                final int from, final int to) throws Exception {
-
-               TypeInformation<Tuple2<Integer, Integer>> resultType = 
TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-               env.setParallelism(numPartitions);
-               env.getConfig().disableSysoutLogging();
-               env.setNumberOfExecutionRetries(0);
-               
-               DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
-                               new RichParallelSourceFunction<Tuple2<Integer, 
Integer>>() {
-
-                                       private volatile boolean running = true;
-
-                                       @Override
-                                       public void 
run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-                                               int cnt = from;
-                                               int partition = 
getRuntimeContext().getIndexOfThisSubtask();
-
-                                               while (running && cnt <= to) {
-                                                       ctx.collect(new 
Tuple2<Integer, Integer>(partition, cnt));
-                                                       cnt++;
-                                               }
-                                       }
-
-                                       @Override
-                                       public void cancel() {
-                                               running = false;
-                                       }
-                               });
-
-               stream.addSink(new FlinkKafkaProducer<>(topic,
-                               new 
TypeInformationSerializationSchema<>(resultType, env.getConfig()),
-                               
FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
-                               new Tuple2Partitioner(numPartitions)
-               ));
-
-               env.execute("Data generator (Int, Int) stream to topic " + 
topic);
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       public static void 
generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
-                                                                               
                                        String brokerConnection, String topic,
-                                                                               
                                        final int numPartitions,
-                                                                               
                                        final int numElements,
-                                                                               
                                        final boolean randomizeOrder) throws 
Exception {
-               env.setParallelism(numPartitions);
-               env.getConfig().disableSysoutLogging();
-               env.setNumberOfExecutionRetries(0);
-
-               DataStream<Integer> stream = env.addSource(
-                               new RichParallelSourceFunction<Integer>() {
-
-                                       private volatile boolean running = true;
-
-                                       @Override
-                                       public void run(SourceContext<Integer> 
ctx) {
-                                               // create a sequence
-                                               int[] elements = new 
int[numElements];
-                                               for (int i = 0, val = 
getRuntimeContext().getIndexOfThisSubtask();
-                                                               i < numElements;
-                                                               i++, val += 
getRuntimeContext().getNumberOfParallelSubtasks()) {
-                                                       
-                                                       elements[i] = val;
-                                               }
-
-                                               // scramble the sequence
-                                               if (randomizeOrder) {
-                                                       Random rnd = new 
Random();
-                                                       for (int i = 0; i < 
elements.length; i++) {
-                                                               int otherPos = 
rnd.nextInt(elements.length);
-                                                               
-                                                               int tmp = 
elements[i];
-                                                               elements[i] = 
elements[otherPos];
-                                                               
elements[otherPos] = tmp;
-                                                       }
-                                               }
-
-                                               // emit the sequence
-                                               int pos = 0;
-                                               while (running && pos < 
elements.length) {
-                                                       
ctx.collect(elements[pos++]);
-                                               }
-                                       }
-
-                                       @Override
-                                       public void cancel() {
-                                               running = false;
-                                       }
-                               });
-
-               stream
-                               .rebalance()
-                               .addSink(new FlinkKafkaProducer<>(topic,
-                                               new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, 
env.getConfig()),
-                                               
FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
-                                               new KafkaPartitioner() {
-                                                       @Override
-                                                       public int 
partition(Object key, int numPartitions) {
-                                                               return 
((Integer) key) % numPartitions;
-                                                       }
-                                               }));
-
-               env.execute("Scrambles int sequence generator");
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       public static class InfiniteStringsGenerator extends Thread {
-
-               private final String kafkaConnectionString;
-               
-               private final String topic;
-               
-               private volatile Throwable error;
-               
-               private volatile boolean running = true;
-
-               
-               public InfiniteStringsGenerator(String kafkaConnectionString, 
String topic) {
-                       this.kafkaConnectionString = kafkaConnectionString;
-                       this.topic = topic;
-               }
-
-               @Override
-               public void run() {
-                       // we manually feed data into the Kafka sink
-                       FlinkKafkaProducer<String> producer = null;
-                       try {
-                               producer = new 
FlinkKafkaProducer<>(kafkaConnectionString, topic, new SimpleStringSchema());
-                               producer.setRuntimeContext(new 
MockRuntimeContext(1,0));
-                               producer.open(new Configuration());
-                               
-                               final StringBuilder bld = new StringBuilder();
-                               final Random rnd = new Random();
-                               
-                               while (running) {
-                                       bld.setLength(0);
-                                       
-                                       int len = rnd.nextInt(100) + 1;
-                                       for (int i = 0; i < len; i++) {
-                                               bld.append((char) 
(rnd.nextInt(20) + 'a') );
-                                       }
-                                       
-                                       String next = bld.toString();
-                                       producer.invoke(next);
-                               }
-                       }
-                       catch (Throwable t) {
-                               this.error = t;
-                       }
-                       finally {
-                               if (producer != null) {
-                                       try {
-                                               producer.close();
-                                       }
-                                       catch (Throwable t) {
-                                               // ignore
-                                       }
-                               }
-                       }
-               }
-               
-               public void shutdown() {
-                       this.running = false;
-                       this.interrupt();
-               }
-               
-               public Throwable getError() {
-                       return this.error;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
deleted file mode 100644
index 987e6c5..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-
-/**
- * Sink function that discards data.
- * @param <T> The type of the function.
- */
-public class DiscardingSink<T> implements SinkFunction<T> {
-
-       private static final long serialVersionUID = 2777597566520109843L;
-
-       @Override
-       public void invoke(T value) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
deleted file mode 100644
index 5a8ffaa..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
-               Checkpointed<Integer>, CheckpointNotifier, Runnable {
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(FailingIdentityMapper.class);
-       
-       private static final long serialVersionUID = 6334389850158707313L;
-       
-       public static volatile boolean failedBefore;
-       public static volatile boolean hasBeenCheckpointedBeforeFailure;
-
-       private final int failCount;
-       private int numElementsTotal;
-       private int numElementsThisTime;
-       
-       private boolean failer;
-       private boolean hasBeenCheckpointed;
-       
-       private Thread printer;
-       private volatile boolean printerRunning = true;
-
-       public FailingIdentityMapper(int failCount) {
-               this.failCount = failCount;
-       }
-
-       @Override
-       public void open(Configuration parameters) {
-               failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
-               printer = new Thread(this, "FailingIdentityMapper Status 
Printer");
-               printer.start();
-       }
-
-       @Override
-       public T map(T value) throws Exception {
-               numElementsTotal++;
-               numElementsThisTime++;
-               
-               if (!failedBefore) {
-                       Thread.sleep(10);
-                       
-                       if (failer && numElementsTotal >= failCount) {
-                               hasBeenCheckpointedBeforeFailure = 
hasBeenCheckpointed;
-                               failedBefore = true;
-                               throw new Exception("Artificial Test Failure");
-                       }
-               }
-               return value;
-       }
-
-       @Override
-       public void close() throws Exception {
-               printerRunning = false;
-               if (printer != null) {
-                       printer.interrupt();
-                       printer = null;
-               }
-       }
-
-       @Override
-       public void notifyCheckpointComplete(long checkpointId) {
-               this.hasBeenCheckpointed = true;
-       }
-
-       @Override
-       public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-               return numElementsTotal;
-       }
-
-       @Override
-       public void restoreState(Integer state) {
-               numElementsTotal = state;
-       }
-
-       @Override
-       public void run() {
-               while (printerRunning) {
-                       try {
-                               Thread.sleep(5000);
-                       }
-                       catch (InterruptedException e) {
-                               // ignore
-                       }
-                       LOG.info("============================> Failing mapper  
{}: count={}, totalCount={}",
-                                       
getRuntimeContext().getIndexOfThisSubtask(),
-                                       numElementsThisTime, numElementsTotal);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
deleted file mode 100644
index e94adb5..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class JobManagerCommunicationUtils {
-       
-       private static final FiniteDuration askTimeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
-       
-       
-       public static void cancelCurrentJob(ActorGateway jobManager) throws 
Exception {
-               
-               // find the jobID
-               Future<Object> listResponse = jobManager.ask(
-                               
JobManagerMessages.getRequestRunningJobsStatus(),
-                               askTimeout);
-
-               List<JobStatusMessage> jobs;
-               try {
-                       Object result = Await.result(listResponse, askTimeout);
-                       jobs = ((JobManagerMessages.RunningJobsStatus) 
result).getStatusMessages();
-               }
-               catch (Exception e) {
-                       throw new Exception("Could not cancel job - failed to 
retrieve running jobs from the JobManager.", e);
-               }
-               
-               if (jobs.isEmpty()) {
-                       throw new Exception("Could not cancel job - no running 
jobs");
-               }
-               if (jobs.size() != 1) {
-                       throw new Exception("Could not cancel job - more than 
one running job.");
-               }
-               
-               JobStatusMessage status = jobs.get(0);
-               if (status.getJobState().isTerminalState()) {
-                       throw new Exception("Could not cancel job - job is not 
running any more");
-               }
-               
-               JobID jobId = status.getJobId();
-               
-               Future<Object> response = jobManager.ask(new 
JobManagerMessages.CancelJob(jobId), askTimeout);
-               try {
-                       Await.result(response, askTimeout);
-               }
-               catch (Exception e) {
-                       throw new Exception("Sending the 'cancel' message 
failed.", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
deleted file mode 100644
index b8afe3a..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.DoubleCounter;
-import org.apache.flink.api.common.accumulators.Histogram;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class MockRuntimeContext extends StreamingRuntimeContext {
-
-       private final int numberOfParallelSubtasks;
-       private final int indexOfThisSubtask;
-
-       public MockRuntimeContext(int numberOfParallelSubtasks, int 
indexOfThisSubtask) {
-               super(new MockStreamOperator(),
-                               new MockEnvironment("no", 4 * 
MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
-                               Collections.<String, Accumulator<?, 
?>>emptyMap());
-               this.numberOfParallelSubtasks = numberOfParallelSubtasks;
-               this.indexOfThisSubtask = indexOfThisSubtask;
-       }
-
-       private static class MockStreamOperator extends AbstractStreamOperator {
-               private static final long serialVersionUID = 
-1153976702711944427L;
-
-               @Override
-               public ExecutionConfig getExecutionConfig() {
-                       return new ExecutionConfig();
-               }
-       }
-
-       @Override
-       public boolean isCheckpointingEnabled() {
-               return true;
-       }
-
-       @Override
-       public String getTaskName() {
-               return null;
-       }
-
-       @Override
-       public int getNumberOfParallelSubtasks() {
-               return numberOfParallelSubtasks;
-       }
-
-       @Override
-       public int getIndexOfThisSubtask() {
-               return indexOfThisSubtask;
-       }
-
-       @Override
-       public int getAttemptNumber() {
-               return 0;
-       }
-
-       @Override
-       public ExecutionConfig getExecutionConfig() {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public ClassLoader getUserCodeClassLoader() {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public <V, A extends Serializable> void addAccumulator(String name, 
Accumulator<V, A> accumulator) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public <V, A extends Serializable> Accumulator<V, A> 
getAccumulator(String name) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public Map<String, Accumulator<?, ?>> getAllAccumulators() {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public IntCounter getIntCounter(String name) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public LongCounter getLongCounter(String name) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public DoubleCounter getDoubleCounter(String name) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public Histogram getHistogram(String name) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public <RT> List<RT> getBroadcastVariable(String name) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public <T, C> C getBroadcastVariableWithInitializer(String name, 
BroadcastVariableInitializer<T, C> initializer) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public DistributedCache getDistributedCache() {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public <S> OperatorState<S> getKeyValueState(String name, Class<S> 
stateType, S defaultState) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public <S> OperatorState<S> getKeyValueState(String name, 
TypeInformation<S> stateType, S defaultState) {
-               throw new UnsupportedOperationException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
deleted file mode 100644
index e105e01..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-
-import java.util.HashSet;
-import java.util.Set;
-
-
-public class PartitionValidatingMapper implements MapFunction<Integer, 
Integer> {
-
-       private static final long serialVersionUID = 1088381231244959088L;
-       
-       /* the partitions from which this function received data */
-       private final Set<Integer> myPartitions = new HashSet<>();
-       
-       private final int numPartitions;
-       private final int maxPartitions;
-
-       public PartitionValidatingMapper(int numPartitions, int maxPartitions) {
-               this.numPartitions = numPartitions;
-               this.maxPartitions = maxPartitions;
-       }
-
-       @Override
-       public Integer map(Integer value) throws Exception {
-               // validate that the partitioning is identical
-               int partition = value % numPartitions;
-               myPartitions.add(partition);
-               if (myPartitions.size() > maxPartitions) {
-                       throw new Exception("Error: Elements from too many 
different partitions: " + myPartitions
-                                       + ". Expect elements only from " + 
maxPartitions + " partitions");
-               }
-               return value;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
deleted file mode 100644
index 12e3460..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-/**
- * Exception that is thrown to terminate a program and indicate success.
- */
-public class SuccessException extends Exception {
-       private static final long serialVersionUID = -7011865671593955887L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
deleted file mode 100644
index 1d61229..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-
-/**
- * An identity map function that sleeps between elements, throttling the
- * processing speed.
- * 
- * @param <T> The type mapped.
- */
-public class ThrottledMapper<T> implements MapFunction<T,T> {
-
-       private static final long serialVersionUID = 467008933767159126L;
-
-       private final int sleep;
-
-       public ThrottledMapper(int sleep) {
-               this.sleep = sleep;
-       }
-
-       @Override
-       public T map(T value) throws Exception {
-               Thread.sleep(this.sleep);
-               return value;
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
deleted file mode 100644
index b762e21..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-
-import java.io.Serializable;
-
-/**
- * Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions.
- */
-public class Tuple2Partitioner extends KafkaPartitioner implements 
Serializable {
-       
-       private static final long serialVersionUID = 1L;
-
-       private final int expectedPartitions;
-
-       
-       public Tuple2Partitioner(int expectedPartitions) {
-               this.expectedPartitions = expectedPartitions;
-       }
-
-       @Override
-       public int partition(Object key, int numPartitions) {
-               if (numPartitions != expectedPartitions) {
-                       throw new IllegalArgumentException("Expected " + 
expectedPartitions + " partitions");
-               }
-               @SuppressWarnings("unchecked")
-               Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) 
key;
-               
-               return element.f0;
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
deleted file mode 100644
index f3cc4fa..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.BitSet;
-
-public class ValidatingExactlyOnceSink implements SinkFunction<Integer>, 
Checkpointed<Tuple2<Integer, BitSet>> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
-
-       private static final long serialVersionUID = 1748426382527469932L;
-       
-       private final int numElementsTotal;
-       
-       private BitSet duplicateChecker = new BitSet();  // this is checkpointed
-
-       private int numElements; // this is checkpointed
-
-       
-       public ValidatingExactlyOnceSink(int numElementsTotal) {
-               this.numElementsTotal = numElementsTotal;
-       }
-
-       
-       @Override
-       public void invoke(Integer value) throws Exception {
-               numElements++;
-               
-               if (duplicateChecker.get(value)) {
-                       throw new Exception("Received a duplicate");
-               }
-               duplicateChecker.set(value);
-               if (numElements == numElementsTotal) {
-                       // validate
-                       if (duplicateChecker.cardinality() != numElementsTotal) 
{
-                               throw new Exception("Duplicate checker has 
wrong cardinality");
-                       }
-                       else if (duplicateChecker.nextClearBit(0) != 
numElementsTotal) {
-                               throw new Exception("Received sparse sequence");
-                       }
-                       else {
-                               throw new SuccessException();
-                       }
-               }
-       }
-
-       @Override
-       public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long 
checkpointTimestamp) {
-               LOG.info("Snapshot of counter "+numElements+" at checkpoint 
"+checkpointId);
-               return new Tuple2<>(numElements, duplicateChecker);
-       }
-
-       @Override
-       public void restoreState(Tuple2<Integer, BitSet> state) {
-               LOG.info("restoring num elements to {}", state.f0);
-               this.numElements = state.f0;
-               this.duplicateChecker = state.f1;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
deleted file mode 100644
index 6bdfb48..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/pom.xml 
b/flink-streaming-connectors/pom.xml
index 1b829f2..dead481 100644
--- a/flink-streaming-connectors/pom.xml
+++ b/flink-streaming-connectors/pom.xml
@@ -37,7 +37,9 @@ under the License.
 
        <modules>
                <module>flink-connector-flume</module>
-               <module>flink-connector-kafka</module>
+               <module>flink-connector-kafka-base</module>
+               <module>flink-connector-kafka-0.8</module>
+               <module>flink-connector-kafka-0.9</module>
                <module>flink-connector-elasticsearch</module>
                <module>flink-connector-rabbitmq</module>
                <module>flink-connector-twitter</module>

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 0320d6b..409304a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -132,7 +132,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
         * @param checkpointTimeout The checkpoint timeout, in milliseconds.
         */
        public void setCheckpointTimeout(long checkpointTimeout) {
-               if (checkpointInterval <= 0) {
+               if (checkpointTimeout <= 0) {
                        throw new IllegalArgumentException("Checkpoint timeout 
must be larger than zero");
                }
                this.checkpointTimeout = checkpointTimeout;

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
deleted file mode 100644
index 80bea8d..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * The deserialization schema describes how to turn the byte key / value 
messages delivered by certain
- * data sources (for example Apache Kafka) into data types (Java/Scala 
objects) that are
- * processed by Flink.
- * 
- * @param <T> The type created by the keyed deserialization schema.
- */
-public interface KeyedDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
-
-       /**
-        * Deserializes the byte message.
-        *
-        * @param messageKey the key as a byte array (null if no key has been 
set)
-        * @param message The message, as a byte array. (null if the message 
was empty or deleted)
-        * @param offset the offset of the message in the original source (for 
example the Kafka offset)
-        * @return The deserialized message as an object.
-        */
-       T deserialize(byte[] messageKey, byte[] message, String topic, long 
offset) throws IOException;
-
-       /**
-        * Method to decide whether the element signals the end of the stream. 
If
-        * true is returned the element won't be emitted.
-        * 
-        * @param nextElement The element to test for the end-of-stream signal.
-        * @return True, if the element signals end of stream, false otherwise.
-        */
-       boolean isEndOfStream(T nextElement);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
deleted file mode 100644
index 8d9cf5d..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.io.IOException;
-
-/**
- * A simple wrapper for using the DeserializationSchema with the 
KeyedDeserializationSchema
- * interface
- * @param <T> The type created by the deserialization schema.
- */
-public class KeyedDeserializationSchemaWrapper<T> implements 
KeyedDeserializationSchema<T> {
-
-       private static final long serialVersionUID = 2651665280744549932L;
-
-       private final DeserializationSchema<T> deserializationSchema;
-
-       public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> 
deserializationSchema) {
-               this.deserializationSchema = deserializationSchema;
-       }
-       @Override
-       public T deserialize(byte[] messageKey, byte[] message, String topic, 
long offset) throws IOException {
-               return deserializationSchema.deserialize(message);
-       }
-
-       @Override
-       public boolean isEndOfStream(T nextElement) {
-               return deserializationSchema.isEndOfStream(nextElement);
-       }
-
-       @Override
-       public TypeInformation<T> getProducedType() {
-               return deserializationSchema.getProducedType();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
deleted file mode 100644
index be3e87e..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.Serializable;
-
-/**
- * The serialization schema describes how to turn a data object into a 
different serialized
- * representation. Most data sinks (for example Apache Kafka) require the data 
to be handed
- * to them in a specific format (for example as byte strings).
- * 
- * @param <T> The type to be serialized.
- */
-public interface KeyedSerializationSchema<T> extends Serializable {
-
-       /**
-        * Serializes the key of the incoming element to a byte array
-        * This method might return null if no key is available.
-        *
-        * @param element The incoming element to be serialized
-        * @return the key of the element as a byte array
-        */
-       byte[] serializeKey(T element);
-
-
-       /**
-        * Serializes the value of the incoming element to a byte array
-        * 
-        * @param element The incoming element to be serialized
-        * @return the value of the element as a byte array
-        */
-       byte[] serializeValue(T element);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
deleted file mode 100644
index a1a8fc0..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-/**
- * A simple wrapper for using the SerializationSchema with the 
KeyedDeserializationSchema
- * interface
- * @param <T> The type to serialize
- */
-public class KeyedSerializationSchemaWrapper<T> implements 
KeyedSerializationSchema<T> {
-
-       private static final long serialVersionUID = 1351665280744549933L;
-
-       private final SerializationSchema<T> serializationSchema;
-
-       public KeyedSerializationSchemaWrapper(SerializationSchema<T> 
serializationSchema) {
-               this.serializationSchema = serializationSchema;
-       }
-
-       @Override
-       public byte[] serializeKey(T element) {
-               return null;
-       }
-
-       @Override
-       public byte[] serializeValue(T element) {
-               return serializationSchema.serialize(element);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
deleted file mode 100644
index 250012f..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-
-import java.io.IOException;
-
-/**
- * A serialization and deserialization schema for Key Value Pairs that uses 
Flink's serialization stack to
- * transform typed from and to byte arrays.
- * 
- * @param <K> The key type to be serialized.
- * @param <V> The value type to be serialized.
- */
-public class TypeInformationKeyValueSerializationSchema<K, V> implements 
KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> 
{
-
-       private static final long serialVersionUID = -5359448468131559102L;
-
-       /** The serializer for the key */
-       private final TypeSerializer<K> keySerializer;
-
-       /** The serializer for the value */
-       private final TypeSerializer<V> valueSerializer;
-
-       /** reusable output serialization buffers */
-       private transient DataOutputSerializer keyOutputSerializer;
-       private transient DataOutputSerializer valueOutputSerializer;
-
-       /** The type information, to be returned by {@link #getProducedType()}. 
It is
-        * transient, because it is not serializable. Note that this means that 
the type information
-        * is not available at runtime, but only prior to the first 
serialization / deserialization */
-       private final transient TypeInformation<Tuple2<K, V>> typeInfo;
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new de-/serialization schema for the given types.
-        *
-        * @param keyTypeInfo The type information for the key type 
de-/serialized by this schema.
-        * @param valueTypeInfo The type information for the value type 
de-/serialized by this schema.
-        * @param ec The execution config, which is used to parametrize the 
type serializers.
-        */
-       public TypeInformationKeyValueSerializationSchema(TypeInformation<K> 
keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) {
-               this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
-               this.keySerializer = keyTypeInfo.createSerializer(ec);
-               this.valueSerializer = valueTypeInfo.createSerializer(ec);
-       }
-
-       public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, 
Class<V> valueClass, ExecutionConfig config) {
-               //noinspection unchecked
-               this( (TypeInformation<K>) 
TypeExtractor.createTypeInfo(keyClass), (TypeInformation<V>) 
TypeExtractor.createTypeInfo(valueClass), config);
-       }
-
-       // 
------------------------------------------------------------------------
-
-
-       @Override
-       public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, 
String topic, long offset) throws IOException {
-               K key = null;
-               if(messageKey != null) {
-                       key = keySerializer.deserialize(new 
ByteArrayInputView(messageKey));
-               }
-               V value = null;
-               if(message != null) {
-                       value = valueSerializer.deserialize(new 
ByteArrayInputView(message));
-               }
-               return new Tuple2<>(key, value);
-       }
-
-       /**
-        * This schema never considers an element to signal end-of-stream, so 
this method returns always false.
-        * @param nextElement The element to test for the end-of-stream signal.
-        * @return Returns false.
-        */
-       @Override
-       public boolean isEndOfStream(Tuple2<K,V> nextElement) {
-               return false;
-       }
-
-
-       @Override
-       public byte[] serializeKey(Tuple2<K, V> element) {
-               if(element.f0 == null) {
-                       return null;
-               } else {
-                       // key is not null. serialize it:
-                       if (keyOutputSerializer == null) {
-                               keyOutputSerializer = new 
DataOutputSerializer(16);
-                       }
-                       try {
-                               keySerializer.serialize(element.f0, 
keyOutputSerializer);
-                       }
-                       catch (IOException e) {
-                               throw new RuntimeException("Unable to serialize 
record", e);
-                       }
-                       // check if key byte array size changed
-                       byte[] res = keyOutputSerializer.getByteArray();
-                       if (res.length != keyOutputSerializer.length()) {
-                               byte[] n = new 
byte[keyOutputSerializer.length()];
-                               System.arraycopy(res, 0, n, 0, 
keyOutputSerializer.length());
-                               res = n;
-                       }
-                       keyOutputSerializer.clear();
-                       return res;
-               }
-       }
-
-       @Override
-       public byte[] serializeValue(Tuple2<K, V> element) {
-               // if the value is null, its serialized value is null as well.
-               if(element.f1 == null) {
-                       return null;
-               }
-
-               if (valueOutputSerializer == null) {
-                       valueOutputSerializer = new DataOutputSerializer(16);
-               }
-
-               try {
-                       valueSerializer.serialize(element.f1, 
valueOutputSerializer);
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Unable to serialize 
record", e);
-               }
-
-               byte[] res = valueOutputSerializer.getByteArray();
-               if (res.length != valueOutputSerializer.length()) {
-                       byte[] n = new byte[valueOutputSerializer.length()];
-                       System.arraycopy(res, 0, n, 0, 
valueOutputSerializer.length());
-                       res = n;
-               }
-               valueOutputSerializer.clear();
-               return res;
-       }
-
-
-       @Override
-       public TypeInformation<Tuple2<K,V>> getProducedType() {
-               if (typeInfo != null) {
-                       return typeInfo;
-               }
-               else {
-                       throw new IllegalStateException(
-                                       "The type information is not available 
after this class has been serialized and distributed.");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
index ce9c9ca..6577be8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
@@ -30,7 +30,6 @@ import java.io.IOException;
  * A serialization and deserialization schema that uses Flink's serialization 
stack to
  * transform typed from and to byte arrays.
  *
- * @see TypeInformationKeyValueSerializationSchema for a serialization schema 
supporting Key Value pairs.
  * 
  * @param <T> The type to be serialized.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 3493f18..304dcb5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -21,10 +21,8 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -36,6 +34,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -45,6 +44,7 @@ import org.junit.Test;
 import java.util.HashMap;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.*;
 
 /**
@@ -577,27 +577,6 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       public static void tryExecute(StreamExecutionEnvironment env, String 
jobName) throws Exception {
-               try {
-                       env.execute(jobName);
-               }
-               catch (ProgramInvocationException | JobExecutionException root) 
{
-                       Throwable cause = root.getCause();
-
-                       // search for nested SuccessExceptions
-                       int depth = 0;
-                       while (!(cause instanceof SuccessException)) {
-                               if (cause == null || depth++ == 20) {
-                                       root.printStackTrace();
-                                       fail("Test failed: " + 
root.getMessage());
-                               }
-                               else {
-                                       cause = cause.getCause();
-                               }
-                       }
-               }
-       }
-
        public static class IntType {
 
                public int value;
@@ -606,8 +585,4 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
 
                public IntType(int value) { this.value = value; }
        }
-
-       static final class SuccessException extends Exception {
-               private static final long serialVersionUID = 
-9218191172606739598L;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 3e0c0f3..81e8f0a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -23,10 +23,8 @@ import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -38,6 +36,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -47,6 +46,7 @@ import org.junit.Test;
 import java.util.HashMap;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.*;
 
 /**
@@ -728,27 +728,6 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       public static void tryExecute(StreamExecutionEnvironment env, String 
jobName) throws Exception {
-               try {
-                       env.execute(jobName);
-               }
-               catch (ProgramInvocationException | JobExecutionException root) 
{
-                       Throwable cause = root.getCause();
-
-                       // search for nested SuccessExceptions
-                       int depth = 0;
-                       while (!(cause instanceof SuccessException)) {
-                               if (cause == null || depth++ == 20) {
-                                       root.printStackTrace();
-                                       fail("Test failed: " + 
root.getMessage());
-                               }
-                               else {
-                                       cause = cause.getCause();
-                               }
-                       }
-               }
-       }
-
        public static class IntType {
 
                public int value;
@@ -757,8 +736,4 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
 
                public IntType(int value) { this.value = value; }
        }
-
-       static final class SuccessException extends Exception {
-               private static final long serialVersionUID = 
-9218191172606739598L;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index e4ebfa0..500d7d3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -50,6 +51,7 @@ import java.util.HashMap;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
+import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -491,27 +493,6 @@ public class WindowCheckpointingITCase extends TestLogger {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       public static void tryExecute(StreamExecutionEnvironment env, String 
jobName) throws Exception {
-               try {
-                       env.execute(jobName);
-               }
-               catch (ProgramInvocationException | JobExecutionException root) 
{
-                       Throwable cause = root.getCause();
-
-                       // search for nested SuccessExceptions
-                       int depth = 0;
-                       while (!(cause instanceof SuccessException)) {
-                               if (cause == null || depth++ == 20) {
-                                       root.printStackTrace();
-                                       fail("Test failed: " + 
root.getMessage());
-                               }
-                               else {
-                                       cause = cause.getCause();
-                               }
-                       }
-               }
-       }
-
        public static class IntType {
 
                public int value;
@@ -520,8 +501,4 @@ public class WindowCheckpointingITCase extends TestLogger {
 
                public IntType(int value) { this.value = value; }
        }
-
-       static final class SuccessException extends Exception {
-               private static final long serialVersionUID = 
-9218191172606739598L;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java 
b/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
new file mode 100644
index 0000000..22ac02b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+/**
+ * Exception that is thrown to terminate a program and indicate success.
+ */
+public class SuccessException extends Exception {
+       private static final long serialVersionUID = -7011865671593955887L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java 
b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
new file mode 100644
index 0000000..86b5002
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import static org.junit.Assert.fail;
+
+public class TestUtils {
+       public static JobExecutionResult tryExecute(StreamExecutionEnvironment 
see, String name) throws Exception {
+               try {
+                       return see.execute(name);
+               }
+               catch (ProgramInvocationException | JobExecutionException root) 
{
+                       Throwable cause = root.getCause();
+
+                       // search for nested SuccessExceptions
+                       int depth = 0;
+                       while (!(cause instanceof SuccessException)) {
+                               if (cause == null || depth++ == 20) {
+                                       root.printStackTrace();
+                                       fail("Test failed: " + 
root.getMessage());
+                               }
+                               else {
+                                       cause = cause.getCause();
+                               }
+                       }
+               }
+               return null;
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8ae24dd..bad12da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -822,6 +822,7 @@ under the License.
                                                <exclude>**/*.iml</exclude>
                                                
<exclude>flink-quickstart/**/testArtifact/goal.txt</exclude>
                                                <!-- Generated content -->
+                                               <exclude>out/**</exclude>
                                                <exclude>**/target/**</exclude>
                                                <exclude>docs/_site/**</exclude>
                                                
<exclude>**/scalastyle-output.xml</exclude>

Reply via email to