Repository: flink
Updated Branches:
  refs/heads/master e0bc37bef -> 893fabf69


[FLINK-7343][kafka-tests] Fix test at-least-once test instability

Previously we could set numElementsBeforeSnapshot to some value during 
checkpointing
AFTER executing shutdown, while at the same time FlinkKafkaProducerXXX snapshot 
for this
value would fail. This lead to incorrectly cacluated expected set of values to 
be present
in the test kafka topic.

Fix is to remember lastSnapshotedElementBeforeShutdown - last snapshot that we 
expect
to succeed without failure.

This closes #5729.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b87e660a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b87e660a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b87e660a

Branch: refs/heads/master
Commit: b87e660ac64bebbd9a0a6aa4334a68736140053f
Parents: ea5342f
Author: Piotr Nowojski <[email protected]>
Authored: Tue Mar 20 11:23:35 2018 +0100
Committer: zentol <[email protected]>
Committed: Wed Mar 21 21:01:51 2018 +0100

----------------------------------------------------------------------
 .../connectors/kafka/Kafka09ProducerITCase.java | 11 ----------
 .../connectors/kafka/KafkaProducerTestBase.java | 23 ++++++++++----------
 2 files changed, 11 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b87e660a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
index c619c3e..f145e56 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -32,15 +32,4 @@ public class Kafka09ProducerITCase extends 
KafkaProducerTestBase {
        public void testExactlyOnceCustomOperator() throws Exception {
                // Kafka09 does not support exactly once semantic
        }
-
-       @Override
-       public void testOneToOneAtLeastOnceRegularSink() throws Exception {
-               // For some reasons this test is sometimes failing in Kafka09 
while the same code works in Kafka010. Disabling
-               // this test because everything indicates those failures might 
be caused by unfixed bugs in Kafka 0.9 branch
-       }
-
-       @Override
-       public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
-               // Disable this test since FlinkKafka09Producer doesn't support 
custom operator mode
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b87e660a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 9278b67..5023a7e 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -292,7 +292,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                                properties,
                                topic,
                                partition,
-                               Collections.unmodifiableSet(new 
HashSet<>(getIntegersSequence(BrokerRestartingMapper.numElementsBeforeSnapshot))),
+                               Collections.unmodifiableSet(new 
HashSet<>(getIntegersSequence(BrokerRestartingMapper.lastSnapshotedElementBeforeShutdown))),
                                KAFKA_READ_TIMEOUT);
 
                deleteTestTopic(topic);
@@ -483,7 +483,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                private static final long serialVersionUID = 
6334389850158707313L;
 
                public static volatile boolean triggeredShutdown;
-               public static volatile int numElementsBeforeSnapshot;
+               public static volatile int lastSnapshotedElementBeforeShutdown;
                public static volatile Runnable shutdownAction;
 
                private final int failCount;
@@ -493,7 +493,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
                public static void resetState(Runnable shutdownAction) {
                        triggeredShutdown = false;
-                       numElementsBeforeSnapshot = 0;
+                       lastSnapshotedElementBeforeShutdown = 0;
                        BrokerRestartingMapper.shutdownAction = shutdownAction;
                }
 
@@ -509,15 +509,12 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                @Override
                public T map(T value) throws Exception {
                        numElementsTotal++;
+                       Thread.sleep(10);
 
-                       if (!triggeredShutdown) {
-                               Thread.sleep(10);
-
-                               if (failer && numElementsTotal >= failCount) {
-                                       // shut down a Kafka broker
-                                       triggeredShutdown = true;
-                                       shutdownAction.run();
-                               }
+                       if (!triggeredShutdown && failer && numElementsTotal >= 
failCount) {
+                               // shut down a Kafka broker
+                               triggeredShutdown = true;
+                               shutdownAction.run();
                        }
                        return value;
                }
@@ -528,7 +525,9 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
                @Override
                public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
-                       numElementsBeforeSnapshot = numElementsTotal;
+                       if (!triggeredShutdown) {
+                               lastSnapshotedElementBeforeShutdown = 
numElementsTotal;
+                       }
                }
 
                @Override

Reply via email to