Repository: flink Updated Branches: refs/heads/master 6cc1c179a -> ccc86e9f1
[FLINK-3530] Fix Kafka08 instability: Avoid restarts from SuccessException This closes #2080 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ccc86e9f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ccc86e9f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ccc86e9f Branch: refs/heads/master Commit: ccc86e9f1cddebc731ac1ccabdd469df11d72d8b Parents: 6cc1c17 Author: Robert Metzger <[email protected]> Authored: Tue Jun 7 18:20:17 2016 +0200 Committer: Robert Metzger <[email protected]> Committed: Thu Jun 9 15:18:09 2016 +0200 ---------------------------------------------------------------------- .../streaming/connectors/kafka/KafkaConsumerTestBase.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ccc86e9f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 3ba8cff..660f24c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -357,7 +357,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env.enableCheckpointing(500); env.setParallelism(parallelism); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); @@ -402,7 +402,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env.enableCheckpointing(500); env.setParallelism(parallelism); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); @@ -447,7 +447,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env.enableCheckpointing(500); env.setParallelism(parallelism); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + // set the number of restarts to one. The failing mapper will fail once, then it's only success exceptions. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); env.setBufferTimeout(0);
