Repository: flink Updated Branches: refs/heads/master e0bc37bef -> 893fabf69
[FLINK-7343][kafka-tests] Fix test at-least-once test instability Previously we could set numElementsBeforeSnapshot to some value during checkpointing AFTER executing shutdown, while at the same time FlinkKafkaProducerXXX snapshot for this value would fail. This lead to incorrectly cacluated expected set of values to be present in the test kafka topic. Fix is to remember lastSnapshotedElementBeforeShutdown - last snapshot that we expect to succeed without failure. This closes #5729. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b87e660a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b87e660a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b87e660a Branch: refs/heads/master Commit: b87e660ac64bebbd9a0a6aa4334a68736140053f Parents: ea5342f Author: Piotr Nowojski <[email protected]> Authored: Tue Mar 20 11:23:35 2018 +0100 Committer: zentol <[email protected]> Committed: Wed Mar 21 21:01:51 2018 +0100 ---------------------------------------------------------------------- .../connectors/kafka/Kafka09ProducerITCase.java | 11 ---------- .../connectors/kafka/KafkaProducerTestBase.java | 23 ++++++++++---------- 2 files changed, 11 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b87e660a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java index c619c3e..f145e56 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java @@ -32,15 +32,4 @@ public class Kafka09ProducerITCase extends KafkaProducerTestBase { public void testExactlyOnceCustomOperator() throws Exception { // Kafka09 does not support exactly once semantic } - - @Override - public void testOneToOneAtLeastOnceRegularSink() throws Exception { - // For some reasons this test is sometimes failing in Kafka09 while the same code works in Kafka010. Disabling - // this test because everything indicates those failures might be caused by unfixed bugs in Kafka 0.9 branch - } - - @Override - public void testOneToOneAtLeastOnceCustomOperator() throws Exception { - // Disable this test since FlinkKafka09Producer doesn't support custom operator mode - } } http://git-wip-us.apache.org/repos/asf/flink/blob/b87e660a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index 9278b67..5023a7e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -292,7 +292,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { properties, topic, partition, - Collections.unmodifiableSet(new HashSet<>(getIntegersSequence(BrokerRestartingMapper.numElementsBeforeSnapshot))), + Collections.unmodifiableSet(new HashSet<>(getIntegersSequence(BrokerRestartingMapper.lastSnapshotedElementBeforeShutdown))), KAFKA_READ_TIMEOUT); deleteTestTopic(topic); @@ -483,7 +483,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { private static final long serialVersionUID = 6334389850158707313L; public static volatile boolean triggeredShutdown; - public static volatile int numElementsBeforeSnapshot; + public static volatile int lastSnapshotedElementBeforeShutdown; public static volatile Runnable shutdownAction; private final int failCount; @@ -493,7 +493,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { public static void resetState(Runnable shutdownAction) { triggeredShutdown = false; - numElementsBeforeSnapshot = 0; + lastSnapshotedElementBeforeShutdown = 0; BrokerRestartingMapper.shutdownAction = shutdownAction; } @@ -509,15 +509,12 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { @Override public T map(T value) throws Exception { numElementsTotal++; + Thread.sleep(10); - if (!triggeredShutdown) { - Thread.sleep(10); - - if (failer && numElementsTotal >= failCount) { - // shut down a Kafka broker - triggeredShutdown = true; - shutdownAction.run(); - } + if (!triggeredShutdown && failer && numElementsTotal >= failCount) { + // shut down a Kafka broker + triggeredShutdown = true; + shutdownAction.run(); } return value; } @@ -528,7 +525,9 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { - numElementsBeforeSnapshot = numElementsTotal; + if (!triggeredShutdown) { + lastSnapshotedElementBeforeShutdown = numElementsTotal; + } } @Override
