[FLINK-7343][kafka-tests] Fix test at-least-once test instability

Previously we could set numElementsBeforeSnapshot to some value during 
checkpointing
AFTER executing shutdown, while at the same time FlinkKafkaProducerXXX snapshot 
for this
value would fail. This lead to incorrectly cacluated expected set of values to 
be present
in the test kafka topic.

Fix is to remember lastSnapshotedElementBeforeShutdown - last snapshot that we 
expect
to succeed without failure.

This closes #5729.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0fa76e50
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0fa76e50
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0fa76e50

Branch: refs/heads/release-1.5
Commit: 0fa76e50f06e84da8764089f36227ed26a2c2765
Parents: 29fd6ce
Author: Piotr Nowojski <[email protected]>
Authored: Tue Mar 20 11:23:35 2018 +0100
Committer: zentol <[email protected]>
Committed: Wed Mar 21 21:01:31 2018 +0100

----------------------------------------------------------------------
 .../connectors/kafka/Kafka09ProducerITCase.java | 11 ----------
 .../connectors/kafka/KafkaProducerTestBase.java | 23 ++++++++++----------
 2 files changed, 11 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0fa76e50/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
index c619c3e..f145e56 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -32,15 +32,4 @@ public class Kafka09ProducerITCase extends 
KafkaProducerTestBase {
        public void testExactlyOnceCustomOperator() throws Exception {
                // Kafka09 does not support exactly once semantic
        }
-
-       @Override
-       public void testOneToOneAtLeastOnceRegularSink() throws Exception {
-               // For some reasons this test is sometimes failing in Kafka09 
while the same code works in Kafka010. Disabling
-               // this test because everything indicates those failures might 
be caused by unfixed bugs in Kafka 0.9 branch
-       }
-
-       @Override
-       public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
-               // Disable this test since FlinkKafka09Producer doesn't support 
custom operator mode
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0fa76e50/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 9278b67..5023a7e 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -292,7 +292,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                                properties,
                                topic,
                                partition,
-                               Collections.unmodifiableSet(new 
HashSet<>(getIntegersSequence(BrokerRestartingMapper.numElementsBeforeSnapshot))),
+                               Collections.unmodifiableSet(new 
HashSet<>(getIntegersSequence(BrokerRestartingMapper.lastSnapshotedElementBeforeShutdown))),
                                KAFKA_READ_TIMEOUT);
 
                deleteTestTopic(topic);
@@ -483,7 +483,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                private static final long serialVersionUID = 
6334389850158707313L;
 
                public static volatile boolean triggeredShutdown;
-               public static volatile int numElementsBeforeSnapshot;
+               public static volatile int lastSnapshotedElementBeforeShutdown;
                public static volatile Runnable shutdownAction;
 
                private final int failCount;
@@ -493,7 +493,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
                public static void resetState(Runnable shutdownAction) {
                        triggeredShutdown = false;
-                       numElementsBeforeSnapshot = 0;
+                       lastSnapshotedElementBeforeShutdown = 0;
                        BrokerRestartingMapper.shutdownAction = shutdownAction;
                }
 
@@ -509,15 +509,12 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                @Override
                public T map(T value) throws Exception {
                        numElementsTotal++;
+                       Thread.sleep(10);
 
-                       if (!triggeredShutdown) {
-                               Thread.sleep(10);
-
-                               if (failer && numElementsTotal >= failCount) {
-                                       // shut down a Kafka broker
-                                       triggeredShutdown = true;
-                                       shutdownAction.run();
-                               }
+                       if (!triggeredShutdown && failer && numElementsTotal >= 
failCount) {
+                               // shut down a Kafka broker
+                               triggeredShutdown = true;
+                               shutdownAction.run();
                        }
                        return value;
                }
@@ -528,7 +525,9 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
                @Override
                public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
-                       numElementsBeforeSnapshot = numElementsTotal;
+                       if (!triggeredShutdown) {
+                               lastSnapshotedElementBeforeShutdown = 
numElementsTotal;
+                       }
                }
 
                @Override

Reply via email to