[hotfix] [kafka consumer] Increase Kafka test stability by validating written 
data before consuming


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20eda1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20eda1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20eda1b

Branch: refs/heads/master
Commit: d20eda1b5252d888189af29a2b493023b4621a88
Parents: af79988
Author: Stephan Ewen <[email protected]>
Authored: Tue Apr 12 20:14:50 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Apr 13 01:10:54 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/Kafka08ITCase.java         |  72 ++----
 .../connectors/kafka/KafkaConsumerTestBase.java | 217 +++++++++++++++----
 .../connectors/kafka/KafkaTestBase.java         |   4 -
 .../testutils/JobManagerCommunicationUtils.java |  24 +-
 .../kafka/testutils/Tuple2Partitioner.java      |   7 +-
 .../org/apache/flink/test/util/TestUtils.java   |   4 +-
 6 files changed, 217 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 0aef3bd..530c032 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -18,9 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -33,7 +31,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
@@ -91,16 +88,11 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
        @Test(timeout = 60000)
        public void testInvalidOffset() throws Exception {
-               final String topic = "invalidOffsetTopic";
+               
                final int parallelism = 1;
-
-               // create topic
-               createTestTopic(topic, parallelism, 1);
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-
+               
                // write 20 messages into topic:
-               writeSequence(env, topic, 20, parallelism);
+               final String topic = writeSequence("invalidOffsetTopic", 20, 
parallelism, 1);
 
                // set invalid offset:
                CuratorFramework curatorClient = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
@@ -110,6 +102,10 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                // read from topic
                final int valuesCount = 20;
                final int startFrom = 0;
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env.getConfig().disableSysoutLogging();
+               
                readSequence(env, standardProps, parallelism, topic, 
valuesCount, startFrom);
 
                deleteTestTopic(topic);
@@ -193,10 +189,10 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
         */
        @Test(timeout = 60000)
        public void testOffsetInZookeeper() throws Exception {
-               final String topicName = "testOffsetInZK";
                final int parallelism = 3;
 
-               createTestTopic(topicName, parallelism, 1);
+               // write a sequence from 0 to 99 to each of the 3 partitions.
+               final String topicName = writeSequence("testOffsetInZK", 100, 
parallelism, 1);
 
                StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
                env1.getConfig().disableSysoutLogging();
@@ -210,16 +206,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                
env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                env2.setParallelism(parallelism);
 
-               StreamExecutionEnvironment env3 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env3.getConfig().disableSysoutLogging();
-               env3.enableCheckpointing(50);
-               
env3.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-               env3.setParallelism(parallelism);
-
-               // write a sequence from 0 to 99 to each of the 3 partitions.
-               writeSequence(env1, topicName, 100, parallelism);
-
-               readSequence(env2, standardProps, parallelism, topicName, 100, 
0);
+               readSequence(env1, standardProps, parallelism, topicName, 100, 
0);
 
                CuratorFramework curatorClient = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
 
@@ -243,33 +230,23 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                curatorClient.close();
 
                // create new env
-               readSequence(env3, standardProps, parallelism, topicName, 50, 
50);
+               readSequence(env2, standardProps, parallelism, topicName, 50, 
50);
 
                deleteTestTopic(topicName);
        }
 
        @Test(timeout = 60000)
        public void testOffsetAutocommitTest() throws Exception {
-               final String topicName = "testOffsetAutocommit";
                final int parallelism = 3;
 
-               createTestTopic(topicName, parallelism, 1);
-
-               StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env1.getConfig().disableSysoutLogging();
-               
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-               env1.setParallelism(parallelism);
-
-               StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               // NOTE: We are not enabling the checkpointing!
-               env2.getConfig().disableSysoutLogging();
-               
env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-               env2.setParallelism(parallelism);
-
-
                // write a sequence from 0 to 99 to each of the 3 partitions.
-               writeSequence(env1, topicName, 100, parallelism);
+               final String topicName = writeSequence("testOffsetAutocommit", 
100, parallelism, 1);
 
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               // NOTE: We are not enabling the checkpointing!
+               env.getConfig().disableSysoutLogging();
+               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env.setParallelism(parallelism);
 
                // the readSequence operation sleeps for 20 ms between each 
record.
                // setting a delay of 25*20 = 500 for the commit interval makes
@@ -280,7 +257,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                readProps.setProperty("auto.commit.interval.ms", "500");
 
                // read so that the offset can be committed to ZK
-               readSequence(env2, readProps, parallelism, topicName, 100, 0);
+               readSequence(env, readProps, parallelism, topicName, 100, 0);
 
                // get the offset
                CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
@@ -314,19 +291,10 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
         */
        @Test(timeout = 60000)
        public void testKafkaOffsetRetrievalToZookeeper() throws Exception {
-               final String topicName = "testKafkaOffsetToZk";
                final int parallelism = 3;
-               
-               createTestTopic(topicName, parallelism, 1);
-               
-               StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env1.getConfig().disableSysoutLogging();
-               
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-               env1.setParallelism(parallelism);
 
                // write a sequence from 0 to 49 to each of the 3 partitions.
-               writeSequence(env1, topicName, 50, parallelism);
-
+               final String topicName =  writeSequence("testKafkaOffsetToZk", 
50, parallelism, 1);
 
                final StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
                env2.getConfig().disableSysoutLogging();

http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index dd468a4..8ff67b4 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -49,13 +50,13 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.client.JobExecutionException;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -65,7 +66,6 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
-import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
@@ -74,6 +74,7 @@ import 
org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import 
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
@@ -90,6 +91,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.junit.Assert;
 
+import org.junit.Before;
 import org.junit.Rule;
 
 import java.io.ByteArrayInputStream;
@@ -121,8 +123,22 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
        
        @Rule
        public RetryRule retryRule = new RetryRule();
-       
 
+
+       // 
------------------------------------------------------------------------
+       //  Common Test Preparation
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Makes sure that no job is on the JobManager any more from any 
previous tests that use
+        * the same mini cluster. Otherwise, missing slots may happen.
+        */
+       @Before
+       public void ensureNoJobIsLingering() throws Exception {
+               
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+       }
+       
+       
        // 
------------------------------------------------------------------------
        //  Suite of Tests
        //
@@ -131,7 +147,6 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
        //  select which tests to run.
        // 
------------------------------------------------------------------------
 
-
        /**
         * Test that ensures the KafkaConsumer is properly failing if the topic 
doesnt exist
         * and a wrong broker was specified
@@ -1080,17 +1095,9 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
         * @throws Exception
         */
        public void runMetricsAndEndOfStreamTest() throws Exception {
-               final String topic = "testEndOfStream";
-               createTestTopic(topic, 1, 1);
-               final int ELEMENT_COUNT = 300;
 
-               // write some data
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.setParallelism(1);
-               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-               env.getConfig().disableSysoutLogging();
-
-               writeSequence(env, topic, ELEMENT_COUNT, 1);
+               final int ELEMENT_COUNT = 300;
+               final String topic = writeSequence("testEndOfStream", 
ELEMENT_COUNT, 1, 1);
 
                // read using custom schema
                final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
@@ -1098,7 +1105,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                env1.getConfig().disableSysoutLogging();
 
-               DataStream<Tuple2<Integer, Integer>> fromKafka = 
env.addSource(kafkaServer.getConsumer(topic, new 
FixedNumberDeserializationSchema(ELEMENT_COUNT), standardProps));
+               DataStream<Tuple2<Integer, Integer>> fromKafka = 
env1.addSource(kafkaServer.getConsumer(topic, new 
FixedNumberDeserializationSchema(ELEMENT_COUNT), standardProps));
                fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, 
Void>() {
                        @Override
                        public void flatMap(Tuple2<Integer, Integer> value, 
Collector<Void> out) throws Exception {
@@ -1106,12 +1113,12 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        }
                });
 
-               JobExecutionResult result = tryExecute(env, "Consume " + 
ELEMENT_COUNT + " elements from Kafka");
+               JobExecutionResult result = tryExecute(env1, "Consume " + 
ELEMENT_COUNT + " elements from Kafka");
 
                Map<String, Object> accuResults = 
result.getAllAccumulatorResults();
                // kafka 0.9 consumer: 39 results
-               if(kafkaServer.getVersion().equals("0.9")) {
-                       Assert.assertTrue("Not enough accumulators from Kafka 
Consumer: " + accuResults.size(), accuResults.size() > 38);
+               if (kafkaServer.getVersion().equals("0.9")) {
+                       assertTrue("Not enough accumulators from Kafka 
Consumer: " + accuResults.size(), accuResults.size() > 38);
                }
 
                deleteTestTopic(topic);
@@ -1188,7 +1195,8 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                                                int v = values[i];
                                                if (v != sourceParallelism) {
                                                        printTopic(topicName, 
valuesCount, deser);
-                                                       throw new 
RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " 
array=" + Arrays.toString(values));
+                                                       throw new 
RuntimeException("Expected v to be " + sourceParallelism + 
+                                                                       ", but 
was " + v + " on element " + i + " array=" + Arrays.toString(values));
                                                }
                                        }
                                        // test has passed
@@ -1203,42 +1211,161 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                LOG.info("Successfully read sequence for verification");
        }
 
-       protected void writeSequence(StreamExecutionEnvironment env, String 
topicName, final int numElements, int parallelism) throws Exception {
+       protected String writeSequence(
+                       String baseTopicName,
+                       final int numElements,
+                       final int parallelism,
+                       final int replicationFactor) throws Exception
+       {
+               LOG.info("\n===================================\n" +
+                               "== Writing sequence of " + numElements + " 
into " + baseTopicName + " with p=" + parallelism + "\n" +
+                               "===================================");
 
-               LOG.info("\n===================================\n== Writing 
sequence of "+numElements+" into "+topicName+" with 
p="+parallelism+"\n===================================");
-               TypeInformation<Tuple2<Integer, Integer>> resultType = 
TypeInfoParser.parse("Tuple2<Integer, Integer>");
+               final TypeInformation<Tuple2<Integer, Integer>> resultType = 
+                               TypeInformation.of(new TypeHint<Tuple2<Integer, 
Integer>>() {});
 
-               DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new 
RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+               final KeyedSerializationSchema<Tuple2<Integer, Integer>> 
serSchema =
+                               new KeyedSerializationSchemaWrapper<>(
+                                               new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
 
-                       private boolean running = true;
+               final KeyedDeserializationSchema<Tuple2<Integer, Integer>> 
deserSchema =
+                               new KeyedDeserializationSchemaWrapper<>(
+                                               new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+               
+               final int maxNumAttempts = 10;
 
-                       @Override
-                       public void run(SourceContext<Tuple2<Integer, Integer>> 
ctx) throws Exception {
-                               int cnt = 0;
-                               int partition = 
getRuntimeContext().getIndexOfThisSubtask();
+               for (int attempt = 1; attempt <= maxNumAttempts; attempt++) {
+                       
+                       final String topicName = baseTopicName + '-' + attempt;
+                       
+                       LOG.info("Writing attempt #1");
+                       
+                       // -------- Write the Sequence --------
+                       
+                       createTestTopic(topicName, parallelism, 
replicationFactor);
 
-                               while (running && cnt < numElements) {
-                                       ctx.collect(new Tuple2<>(partition, 
cnt));
-                                       cnt++;
+                       StreamExecutionEnvironment writeEnv = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                       
writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+                       writeEnv.getConfig().disableSysoutLogging();
+                       
+                       DataStream<Tuple2<Integer, Integer>> stream = 
writeEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+       
+                               private boolean running = true;
+       
+                               @Override
+                               public void run(SourceContext<Tuple2<Integer, 
Integer>> ctx) throws Exception {
+                                       int cnt = 0;
+                                       int partition = 
getRuntimeContext().getIndexOfThisSubtask();
+       
+                                       while (running && cnt < numElements) {
+                                               ctx.collect(new 
Tuple2<>(partition, cnt));
+                                               cnt++;
+                                       }
                                }
-                       }
+       
+                               @Override
+                               public void cancel() {
+                                       running = false;
+                               }
+                       }).setParallelism(parallelism);
+       
+                       // the producer must not produce duplicates
+                       Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+                       producerProperties.setProperty("retries", "0");
+                       
+                       stream.addSink(kafkaServer.getProducer(
+                                                       topicName, serSchema, 
producerProperties,
+                                                       new 
Tuple2Partitioner(parallelism)))
+                                       .setParallelism(parallelism);
 
-                       @Override
-                       public void cancel() {
-                               running = false;
-                       }
-               }).setParallelism(parallelism);
+                       writeEnv.execute("Write sequence");
+                       LOG.info("Finished writing sequence");
 
-               Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
-               producerProperties.setProperty("retries", "3");
-               stream.addSink(kafkaServer.getProducer(topicName,
-                               new KeyedSerializationSchemaWrapper<>(new 
TypeInformationSerializationSchema<>(resultType, env.getConfig())),
-                               producerProperties,
-                               new 
Tuple2Partitioner(parallelism))).setParallelism(parallelism);
+                       // -------- Validate the Sequence --------
+                       
+                       // we need to validate the sequence, because kafka's 
producers are not exactly once
+                       LOG.info("Validating sequence");
+                       
+                       final StreamExecutionEnvironment readEnv = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                       
readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+                       readEnv.getConfig().disableSysoutLogging();
+                       readEnv.setParallelism(parallelism);
+                       
+                       Properties readProps = (Properties) 
standardProps.clone();
+                       readProps.setProperty("group.id", 
"flink-tests-validator");
+                       
+                       FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> 
consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
 
-               env.execute("Write sequence");
+                       readEnv
+                                       .addSource(consumer)
+                                       .map(new 
RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
+                                               
+                                               private final int totalCount = 
parallelism * numElements;
+                                               private int count = 0;
+                                               
+                                               @Override
+                                               public Tuple2<Integer, Integer> 
map(Tuple2<Integer, Integer> value) throws Exception {
+                                                       if (++count == 
totalCount) {
+                                                               throw new 
SuccessException();
+                                                       } else {
+                                                               return value;
+                                                       }
+                                               }
+                                       }).setParallelism(1)
+                                       .addSink(new 
DiscardingSink<Tuple2<Integer, Integer>>()).setParallelism(1);
+                       
+                       final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+                       
+                       Thread runner = new Thread() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               tryExecute(readEnv, "sequence 
validation");
+                                       } catch (Throwable t) {
+                                               errorRef.set(t);
+                                       }
+                               }
+                       };
+                       runner.start();
+                       
+                       final long deadline = System.currentTimeMillis() + 
10000;
+                       long delay;
+                       while (runner.isAlive() && (delay = deadline - 
System.currentTimeMillis()) > 0) {
+                               runner.join(delay);
+                       }
+                       
+                       boolean success;
+                       
+                       if (runner.isAlive()) {
+                               // did not finish in time, maybe the producer 
dropped one or more records and
+                               // the validation did not reach the exit point
+                               success = false;
+                               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+                       }
+                       else {
+                               Throwable error = errorRef.get();
+                               if (error != null) {
+                                       success = false;
+                                       LOG.info("Attempt " + attempt + " 
failed with exception", error);
+                               }
+                               else {
+                                       success = true;
+                               }
+                       }
 
-               LOG.info("Finished writing sequence");
+                       
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+                       
+                       if (success) {
+                               // everything is good!
+                               return topicName;
+                       }
+                       else {
+                               deleteTestTopic(topicName);
+                               // fall through the loop
+                       }
+               }
+               
+               throw new Exception("Could not write a valid sequence to Kafka 
after " + maxNumAttempts + " attempts");
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index c475fad..64b9106 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -17,9 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import kafka.consumer.ConsumerConfig;
-
-
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -30,7 +27,6 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
index 24822ed..f2f761d 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -31,10 +31,28 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 public class JobManagerCommunicationUtils {
-       
+
        private static final FiniteDuration askTimeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
-       
-       
+
+
+       public static void waitUntilNoJobIsRunning(ActorGateway jobManager) 
throws Exception {
+               while (true) {
+                       // find the jobID
+                       Future<Object> listResponse = jobManager.ask(
+                                       
JobManagerMessages.getRequestRunningJobsStatus(), askTimeout);
+
+                       Object result = Await.result(listResponse, askTimeout);
+                       List<JobStatusMessage> jobs = 
((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+
+
+                       if (jobs.isEmpty()) {
+                               return;
+                       }
+
+                       Thread.sleep(50);
+               }
+       }
+
        public static void cancelCurrentJob(ActorGateway jobManager) throws 
Exception {
                JobStatusMessage status = null;
                

http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
index 0844412..c9e9ac1 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
@@ -28,12 +28,11 @@ import java.io.Serializable;
  * and that expects a specific number of partitions.
  */
 public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, 
Integer>> implements Serializable {
-       
+
        private static final long serialVersionUID = 1L;
 
        private final int expectedPartitions;
 
-       
        public Tuple2Partitioner(int expectedPartitions) {
                this.expectedPartitions = expectedPartitions;
        }
@@ -43,9 +42,7 @@ public class Tuple2Partitioner extends 
KafkaPartitioner<Tuple2<Integer, Integer>
                if (numPartitions != expectedPartitions) {
                        throw new IllegalArgumentException("Expected " + 
expectedPartitions + " partitions");
                }
-               @SuppressWarnings("unchecked")
-               Tuple2<Integer, Integer> element = next;
 
-               return element.f0;
+               return next.f0;
        }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java 
b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
index 86b5002..4413d3f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import static org.junit.Assert.fail;
 
 public class TestUtils {
+       
        public static JobExecutionResult tryExecute(StreamExecutionEnvironment 
see, String name) throws Exception {
                try {
                        return see.execute(name);
@@ -45,8 +46,7 @@ public class TestUtils {
                                }
                        }
                }
+               
                return null;
        }
-
-
 }

Reply via email to