[hotfix][kafka-tests] Clean up and drop unused field in KafkaProducerTestBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea5342f8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea5342f8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea5342f8 Branch: refs/heads/master Commit: ea5342f8a912abe79ffcd83c8c352b070df343f7 Parents: e273d5f Author: Piotr Nowojski <[email protected]> Authored: Tue Mar 20 11:17:18 2018 +0100 Committer: zentol <[email protected]> Committed: Wed Mar 21 21:01:51 2018 +0100 ---------------------------------------------------------------------- .../connectors/kafka/KafkaProducerTestBase.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ea5342f8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index 8104d8f..9278b67 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -482,8 +482,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { private static final long serialVersionUID = 6334389850158707313L; - public static volatile boolean restartedLeaderBefore; - public static volatile boolean hasBeenCheckpointedBeforeFailure; + public static volatile boolean triggeredShutdown; public static volatile int numElementsBeforeSnapshot; public static volatile Runnable shutdownAction; @@ -491,11 +490,9 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { private int numElementsTotal; private boolean failer; - private boolean hasBeenCheckpointed; public static void resetState(Runnable shutdownAction) { - restartedLeaderBefore = false; - hasBeenCheckpointedBeforeFailure = false; + triggeredShutdown = false; numElementsBeforeSnapshot = 0; BrokerRestartingMapper.shutdownAction = shutdownAction; } @@ -513,13 +510,12 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { public T map(T value) throws Exception { numElementsTotal++; - if (!restartedLeaderBefore) { + if (!triggeredShutdown) { Thread.sleep(10); if (failer && numElementsTotal >= failCount) { // shut down a Kafka broker - hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed; - restartedLeaderBefore = true; + triggeredShutdown = true; shutdownAction.run(); } } @@ -528,7 +524,6 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { @Override public void notifyCheckpointComplete(long checkpointId) { - hasBeenCheckpointed = true; } @Override
