[hotfix] [kafka consumer] Increase Kafka test stability by validating written data before consuming
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20eda1b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20eda1b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20eda1b Branch: refs/heads/master Commit: d20eda1b5252d888189af29a2b493023b4621a88 Parents: af79988 Author: Stephan Ewen <[email protected]> Authored: Tue Apr 12 20:14:50 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Apr 13 01:10:54 2016 +0200 ---------------------------------------------------------------------- .../connectors/kafka/Kafka08ITCase.java | 72 ++---- .../connectors/kafka/KafkaConsumerTestBase.java | 217 +++++++++++++++---- .../connectors/kafka/KafkaTestBase.java | 4 - .../testutils/JobManagerCommunicationUtils.java | 24 +- .../kafka/testutils/Tuple2Partitioner.java | 7 +- .../org/apache/flink/test/util/TestUtils.java | 4 +- 6 files changed, 217 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index 0aef3bd..530c032 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -18,9 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; + import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.streaming.api.datastream.DataStream; @@ -33,7 +31,6 @@ import org.junit.Assert; import org.junit.Test; import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; @@ -91,16 +88,11 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { @Test(timeout = 60000) public void testInvalidOffset() throws Exception { - final String topic = "invalidOffsetTopic"; + final int parallelism = 1; - - // create topic - createTestTopic(topic, parallelism, 1); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - + // write 20 messages into topic: - writeSequence(env, topic, 20, parallelism); + final String topic = writeSequence("invalidOffsetTopic", 20, parallelism, 1); // set invalid offset: CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); @@ -110,6 +102,10 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { // read from topic final int valuesCount = 20; final int startFrom = 0; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom); deleteTestTopic(topic); @@ -193,10 +189,10 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { */ @Test(timeout = 60000) public void testOffsetInZookeeper() throws Exception { - final String topicName = "testOffsetInZK"; final int parallelism = 3; - createTestTopic(topicName, parallelism, 1); + // write a sequence from 0 to 99 to each of the 3 partitions. + final String topicName = writeSequence("testOffsetInZK", 100, parallelism, 1); StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env1.getConfig().disableSysoutLogging(); @@ -210,16 +206,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { env2.getConfig().setRestartStrategy(RestartStrategies.noRestart()); env2.setParallelism(parallelism); - StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env3.getConfig().disableSysoutLogging(); - env3.enableCheckpointing(50); - env3.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env3.setParallelism(parallelism); - - // write a sequence from 0 to 99 to each of the 3 partitions. - writeSequence(env1, topicName, 100, parallelism); - - readSequence(env2, standardProps, parallelism, topicName, 100, 0); + readSequence(env1, standardProps, parallelism, topicName, 100, 0); CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); @@ -243,33 +230,23 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { curatorClient.close(); // create new env - readSequence(env3, standardProps, parallelism, topicName, 50, 50); + readSequence(env2, standardProps, parallelism, topicName, 50, 50); deleteTestTopic(topicName); } @Test(timeout = 60000) public void testOffsetAutocommitTest() throws Exception { - final String topicName = "testOffsetAutocommit"; final int parallelism = 3; - createTestTopic(topicName, parallelism, 1); - - StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env1.getConfig().disableSysoutLogging(); - env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env1.setParallelism(parallelism); - - StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - // NOTE: We are not enabling the checkpointing! - env2.getConfig().disableSysoutLogging(); - env2.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env2.setParallelism(parallelism); - - // write a sequence from 0 to 99 to each of the 3 partitions. - writeSequence(env1, topicName, 100, parallelism); + final String topicName = writeSequence("testOffsetAutocommit", 100, parallelism, 1); + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + // NOTE: We are not enabling the checkpointing! + env.getConfig().disableSysoutLogging(); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setParallelism(parallelism); // the readSequence operation sleeps for 20 ms between each record. // setting a delay of 25*20 = 500 for the commit interval makes @@ -280,7 +257,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { readProps.setProperty("auto.commit.interval.ms", "500"); // read so that the offset can be committed to ZK - readSequence(env2, readProps, parallelism, topicName, 100, 0); + readSequence(env, readProps, parallelism, topicName, 100, 0); // get the offset CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); @@ -314,19 +291,10 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { */ @Test(timeout = 60000) public void testKafkaOffsetRetrievalToZookeeper() throws Exception { - final String topicName = "testKafkaOffsetToZk"; final int parallelism = 3; - - createTestTopic(topicName, parallelism, 1); - - StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env1.getConfig().disableSysoutLogging(); - env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env1.setParallelism(parallelism); // write a sequence from 0 to 49 to each of the 3 partitions. - writeSequence(env1, topicName, 50, parallelism); - + final String topicName = writeSequence("testKafkaOffsetToZk", 50, parallelism, 1); final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env2.getConfig().disableSysoutLogging(); http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index dd468a4..8ff67b4 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -35,6 +35,7 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; @@ -49,13 +50,13 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; @@ -65,7 +66,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; -import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink; import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper; import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper; @@ -74,6 +74,7 @@ import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema; @@ -90,6 +91,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.TimeoutException; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import java.io.ByteArrayInputStream; @@ -121,8 +123,22 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { @Rule public RetryRule retryRule = new RetryRule(); - + + // ------------------------------------------------------------------------ + // Common Test Preparation + // ------------------------------------------------------------------------ + + /** + * Makes sure that no job is on the JobManager any more from any previous tests that use + * the same mini cluster. Otherwise, missing slots may happen. + */ + @Before + public void ensureNoJobIsLingering() throws Exception { + JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); + } + + // ------------------------------------------------------------------------ // Suite of Tests // @@ -131,7 +147,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // select which tests to run. // ------------------------------------------------------------------------ - /** * Test that ensures the KafkaConsumer is properly failing if the topic doesnt exist * and a wrong broker was specified @@ -1080,17 +1095,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { * @throws Exception */ public void runMetricsAndEndOfStreamTest() throws Exception { - final String topic = "testEndOfStream"; - createTestTopic(topic, 1, 1); - final int ELEMENT_COUNT = 300; - // write some data - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(1); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - - writeSequence(env, topic, ELEMENT_COUNT, 1); + final int ELEMENT_COUNT = 300; + final String topic = writeSequence("testEndOfStream", ELEMENT_COUNT, 1, 1); // read using custom schema final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); @@ -1098,7 +1105,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); env1.getConfig().disableSysoutLogging(); - DataStream<Tuple2<Integer, Integer>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), standardProps)); + DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), standardProps)); fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Void>() { @Override public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception { @@ -1106,12 +1113,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } }); - JobExecutionResult result = tryExecute(env, "Consume " + ELEMENT_COUNT + " elements from Kafka"); + JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka"); Map<String, Object> accuResults = result.getAllAccumulatorResults(); // kafka 0.9 consumer: 39 results - if(kafkaServer.getVersion().equals("0.9")) { - Assert.assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38); + if (kafkaServer.getVersion().equals("0.9")) { + assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38); } deleteTestTopic(topic); @@ -1188,7 +1195,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { int v = values[i]; if (v != sourceParallelism) { printTopic(topicName, valuesCount, deser); - throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values)); + throw new RuntimeException("Expected v to be " + sourceParallelism + + ", but was " + v + " on element " + i + " array=" + Arrays.toString(values)); } } // test has passed @@ -1203,42 +1211,161 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { LOG.info("Successfully read sequence for verification"); } - protected void writeSequence(StreamExecutionEnvironment env, String topicName, final int numElements, int parallelism) throws Exception { + protected String writeSequence( + String baseTopicName, + final int numElements, + final int parallelism, + final int replicationFactor) throws Exception + { + LOG.info("\n===================================\n" + + "== Writing sequence of " + numElements + " into " + baseTopicName + " with p=" + parallelism + "\n" + + "==================================="); - LOG.info("\n===================================\n== Writing sequence of "+numElements+" into "+topicName+" with p="+parallelism+"\n==================================="); - TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>"); + final TypeInformation<Tuple2<Integer, Integer>> resultType = + TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}); - DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { + final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema = + new KeyedSerializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); - private boolean running = true; + final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema = + new KeyedDeserializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); + + final int maxNumAttempts = 10; - @Override - public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { - int cnt = 0; - int partition = getRuntimeContext().getIndexOfThisSubtask(); + for (int attempt = 1; attempt <= maxNumAttempts; attempt++) { + + final String topicName = baseTopicName + '-' + attempt; + + LOG.info("Writing attempt #1"); + + // -------- Write the Sequence -------- + + createTestTopic(topicName, parallelism, replicationFactor); - while (running && cnt < numElements) { - ctx.collect(new Tuple2<>(partition, cnt)); - cnt++; + StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + writeEnv.getConfig().disableSysoutLogging(); + + DataStream<Tuple2<Integer, Integer>> stream = writeEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { + + private boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { + int cnt = 0; + int partition = getRuntimeContext().getIndexOfThisSubtask(); + + while (running && cnt < numElements) { + ctx.collect(new Tuple2<>(partition, cnt)); + cnt++; + } } - } + + @Override + public void cancel() { + running = false; + } + }).setParallelism(parallelism); + + // the producer must not produce duplicates + Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); + producerProperties.setProperty("retries", "0"); + + stream.addSink(kafkaServer.getProducer( + topicName, serSchema, producerProperties, + new Tuple2Partitioner(parallelism))) + .setParallelism(parallelism); - @Override - public void cancel() { - running = false; - } - }).setParallelism(parallelism); + writeEnv.execute("Write sequence"); + LOG.info("Finished writing sequence"); - Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); - producerProperties.setProperty("retries", "3"); - stream.addSink(kafkaServer.getProducer(topicName, - new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(resultType, env.getConfig())), - producerProperties, - new Tuple2Partitioner(parallelism))).setParallelism(parallelism); + // -------- Validate the Sequence -------- + + // we need to validate the sequence, because kafka's producers are not exactly once + LOG.info("Validating sequence"); + + final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + readEnv.getConfig().disableSysoutLogging(); + readEnv.setParallelism(parallelism); + + Properties readProps = (Properties) standardProps.clone(); + readProps.setProperty("group.id", "flink-tests-validator"); + + FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps); - env.execute("Write sequence"); + readEnv + .addSource(consumer) + .map(new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { + + private final int totalCount = parallelism * numElements; + private int count = 0; + + @Override + public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception { + if (++count == totalCount) { + throw new SuccessException(); + } else { + return value; + } + } + }).setParallelism(1) + .addSink(new DiscardingSink<Tuple2<Integer, Integer>>()).setParallelism(1); + + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + Thread runner = new Thread() { + @Override + public void run() { + try { + tryExecute(readEnv, "sequence validation"); + } catch (Throwable t) { + errorRef.set(t); + } + } + }; + runner.start(); + + final long deadline = System.currentTimeMillis() + 10000; + long delay; + while (runner.isAlive() && (delay = deadline - System.currentTimeMillis()) > 0) { + runner.join(delay); + } + + boolean success; + + if (runner.isAlive()) { + // did not finish in time, maybe the producer dropped one or more records and + // the validation did not reach the exit point + success = false; + JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + } + else { + Throwable error = errorRef.get(); + if (error != null) { + success = false; + LOG.info("Attempt " + attempt + " failed with exception", error); + } + else { + success = true; + } + } - LOG.info("Finished writing sequence"); + JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); + + if (success) { + // everything is good! + return topicName; + } + else { + deleteTestTopic(topicName); + // fall through the loop + } + } + + throw new Exception("Could not write a valid sequence to Kafka after " + maxNumAttempts + " attempts"); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index c475fad..64b9106 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -17,9 +17,6 @@ package org.apache.flink.streaming.connectors.kafka; -import kafka.consumer.ConsumerConfig; - - import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -30,7 +27,6 @@ import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; - import org.junit.AfterClass; import org.junit.BeforeClass; http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java index 24822ed..f2f761d 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java @@ -31,10 +31,28 @@ import java.util.List; import java.util.concurrent.TimeUnit; public class JobManagerCommunicationUtils { - + private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS); - - + + + public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Exception { + while (true) { + // find the jobID + Future<Object> listResponse = jobManager.ask( + JobManagerMessages.getRequestRunningJobsStatus(), askTimeout); + + Object result = Await.result(listResponse, askTimeout); + List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); + + + if (jobs.isEmpty()) { + return; + } + + Thread.sleep(50); + } + } + public static void cancelCurrentJob(ActorGateway jobManager) throws Exception { JobStatusMessage status = null; http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java index 0844412..c9e9ac1 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java @@ -28,12 +28,11 @@ import java.io.Serializable; * and that expects a specific number of partitions. */ public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable { - + private static final long serialVersionUID = 1L; private final int expectedPartitions; - public Tuple2Partitioner(int expectedPartitions) { this.expectedPartitions = expectedPartitions; } @@ -43,9 +42,7 @@ public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer> if (numPartitions != expectedPartitions) { throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions"); } - @SuppressWarnings("unchecked") - Tuple2<Integer, Integer> element = next; - return element.f0; + return next.f0; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java index 86b5002..4413d3f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java +++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import static org.junit.Assert.fail; public class TestUtils { + public static JobExecutionResult tryExecute(StreamExecutionEnvironment see, String name) throws Exception { try { return see.execute(name); @@ -45,8 +46,7 @@ public class TestUtils { } } } + return null; } - - }
