[hotfix][kafka-tests] Clean up and drop unused field in KafkaProducerTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea5342f8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea5342f8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea5342f8

Branch: refs/heads/master
Commit: ea5342f8a912abe79ffcd83c8c352b070df343f7
Parents: e273d5f
Author: Piotr Nowojski <[email protected]>
Authored: Tue Mar 20 11:17:18 2018 +0100
Committer: zentol <[email protected]>
Committed: Wed Mar 21 21:01:51 2018 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaProducerTestBase.java        | 13 ++++---------
 1 file changed, 4 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea5342f8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 8104d8f..9278b67 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -482,8 +482,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
                private static final long serialVersionUID = 
6334389850158707313L;
 
-               public static volatile boolean restartedLeaderBefore;
-               public static volatile boolean hasBeenCheckpointedBeforeFailure;
+               public static volatile boolean triggeredShutdown;
                public static volatile int numElementsBeforeSnapshot;
                public static volatile Runnable shutdownAction;
 
@@ -491,11 +490,9 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                private int numElementsTotal;
 
                private boolean failer;
-               private boolean hasBeenCheckpointed;
 
                public static void resetState(Runnable shutdownAction) {
-                       restartedLeaderBefore = false;
-                       hasBeenCheckpointedBeforeFailure = false;
+                       triggeredShutdown = false;
                        numElementsBeforeSnapshot = 0;
                        BrokerRestartingMapper.shutdownAction = shutdownAction;
                }
@@ -513,13 +510,12 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                public T map(T value) throws Exception {
                        numElementsTotal++;
 
-                       if (!restartedLeaderBefore) {
+                       if (!triggeredShutdown) {
                                Thread.sleep(10);
 
                                if (failer && numElementsTotal >= failCount) {
                                        // shut down a Kafka broker
-                                       hasBeenCheckpointedBeforeFailure = 
hasBeenCheckpointed;
-                                       restartedLeaderBefore = true;
+                                       triggeredShutdown = true;
                                        shutdownAction.run();
                                }
                        }
@@ -528,7 +524,6 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
                @Override
                public void notifyCheckpointComplete(long checkpointId) {
-                       hasBeenCheckpointed = true;
                }
 
                @Override

Reply via email to