[hotfix][kafka-tests] Clean up and drop unused field in KafkaProducerTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29fd6ce3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29fd6ce3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29fd6ce3

Branch: refs/heads/release-1.5
Commit: 29fd6ce316d26c85647cc2b341fef20d983b420f
Parents: 2b85b46
Author: Piotr Nowojski <[email protected]>
Authored: Tue Mar 20 11:17:18 2018 +0100
Committer: zentol <[email protected]>
Committed: Wed Mar 21 21:01:31 2018 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaProducerTestBase.java        | 13 ++++---------
 1 file changed, 4 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/29fd6ce3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 8104d8f..9278b67 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -482,8 +482,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
                private static final long serialVersionUID = 
6334389850158707313L;
 
-               public static volatile boolean restartedLeaderBefore;
-               public static volatile boolean hasBeenCheckpointedBeforeFailure;
+               public static volatile boolean triggeredShutdown;
                public static volatile int numElementsBeforeSnapshot;
                public static volatile Runnable shutdownAction;
 
@@ -491,11 +490,9 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                private int numElementsTotal;
 
                private boolean failer;
-               private boolean hasBeenCheckpointed;
 
                public static void resetState(Runnable shutdownAction) {
-                       restartedLeaderBefore = false;
-                       hasBeenCheckpointedBeforeFailure = false;
+                       triggeredShutdown = false;
                        numElementsBeforeSnapshot = 0;
                        BrokerRestartingMapper.shutdownAction = shutdownAction;
                }
@@ -513,13 +510,12 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                public T map(T value) throws Exception {
                        numElementsTotal++;
 
-                       if (!restartedLeaderBefore) {
+                       if (!triggeredShutdown) {
                                Thread.sleep(10);
 
                                if (failer && numElementsTotal >= failCount) {
                                        // shut down a Kafka broker
-                                       hasBeenCheckpointedBeforeFailure = 
hasBeenCheckpointed;
-                                       restartedLeaderBefore = true;
+                                       triggeredShutdown = true;
                                        shutdownAction.run();
                                }
                        }
@@ -528,7 +524,6 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
                @Override
                public void notifyCheckpointComplete(long checkpointId) {
-                       hasBeenCheckpointed = true;
                }
 
                @Override

Reply via email to