[FLINK-7343][Kafka] Use NetworkFailureProxy in kafka tests

We shouldn't fail KafkaServers directly, because they might not be able
to flush the data. Since we don't want to test how well Kafka implements
at-least-once/exactly-once semantic, we just simulate network failure
between Flink and Kafka in our at-least-once tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd373efe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd373efe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd373efe

Branch: refs/heads/master
Commit: cd373efe7171d71c0797a30efa256c91ca7d2714
Parents: e11a591
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Aug 3 11:27:12 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Aug 8 10:13:02 2017 +0200

----------------------------------------------------------------------
 .../kafka/KafkaTestEnvironmentImpl.java         |  6 +++
 .../connectors/kafka/Kafka08ITCase.java         |  7 +++
 .../kafka/KafkaTestEnvironmentImpl.java         |  8 +++-
 .../kafka/Kafka09SecuredRunITCase.java          |  4 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  6 +++
 .../connectors/kafka/KafkaProducerTestBase.java | 49 ++++++--------------
 .../kafka/KafkaShortRetentionTestBase.java      |  2 +-
 .../connectors/kafka/KafkaTestBase.java         | 16 +++++--
 .../connectors/kafka/KafkaTestEnvironment.java  | 37 ++++++++++++++-
 9 files changed, 89 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 9f1d379..5be802f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -414,6 +415,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        int kafkaPort = NetUtils.getAvailablePort();
                        kafkaProperties.put("port", 
Integer.toString(kafkaPort));
 
+                       if (config.isHideKafkaBehindProxy()) {
+                               NetworkFailuresProxy proxy = 
createProxy(KAFKA_HOST, kafkaPort);
+                               kafkaProperties.put("advertised.port", 
proxy.getLocalPort());
+                       }
+
                        //to support secure kafka cluster
                        if (config.isSecureMode()) {
                                LOG.info("Adding Kafka secure configurations");

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 91dc929..b3afa57 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.Properties;
@@ -36,6 +37,12 @@ import static org.junit.Assert.fail;
  */
 public class Kafka08ITCase extends KafkaConsumerTestBase {
 
+       @BeforeClass
+       public static void prepare() throws ClassNotFoundException {
+               // Somehow KafkaConsumer 0.8 doesn't handle broker failures if 
they are behind a proxy
+               prepare(false);
+       }
+
        // 
------------------------------------------------------------------------
        //  Suite of Tests
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index af5ad67..eb1f57e 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -84,7 +85,6 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        private String zookeeperConnectionString;
        private String brokerConnectionString = "";
        private Properties standardProps;
-
        private Config config;
 
        public String getBrokerConnectionString() {
@@ -401,6 +401,12 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                for (int i = 1; i <= numTries; i++) {
                        int kafkaPort = NetUtils.getAvailablePort();
                        kafkaProperties.put("port", 
Integer.toString(kafkaPort));
+
+                       if (config.isHideKafkaBehindProxy()) {
+                               NetworkFailuresProxy proxy = 
createProxy(KAFKA_HOST, kafkaPort);
+                               kafkaProperties.put("advertised.port", 
Integer.toString(proxy.getLocalPort()));
+                       }
+
                        KafkaConfig kafkaConfig = new 
KafkaConfig(kafkaProperties);
 
                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
index d41cd91..b4002c7 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -41,11 +41,11 @@ public class Kafka09SecuredRunITCase extends 
KafkaConsumerTestBase {
                SecureTestEnvironment.prepare(tempFolder);
                
SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
 
-               startClusters(true);
+               startClusters(true, false);
        }
 
        @AfterClass
-       public static void shutDownServices() {
+       public static void shutDownServices() throws Exception {
                shutdownClusters();
                SecureTestEnvironment.cleanup();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 517f096..676e588 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -407,6 +408,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        int kafkaPort = NetUtils.getAvailablePort();
                        kafkaProperties.put("port", 
Integer.toString(kafkaPort));
 
+                       if (config.isHideKafkaBehindProxy()) {
+                               NetworkFailuresProxy proxy = 
createProxy(KAFKA_HOST, kafkaPort);
+                               kafkaProperties.put("advertised.port", 
proxy.getLocalPort());
+                       }
+
                        //to support secure kafka cluster
                        if (config.isSecureMode()) {
                                LOG.info("Adding Kafka secure configurations");

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 1af9ca8..4a61103 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -46,7 +46,6 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Preconditions;
 
 import com.google.common.collect.ImmutableSet;
-import kafka.server.KafkaServer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.Test;
 
@@ -214,7 +213,8 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
        /**
         * This test sets KafkaProducer so that it will not automatically flush 
the data and
-        * and fails the broker to check whether FlinkKafkaProducer flushed 
records manually on snapshotState.
+        * simulate network failure between Flink and Kafka to check whether 
FlinkKafkaProducer
+        * flushed records manually on snapshotState.
         */
        protected void testOneToOneAtLeastOnce(boolean regularSink) throws 
Exception {
                final String topic = regularSink ? "oneToOneTopicRegularSink" : 
"oneToOneTopicCustomOperator";
@@ -243,13 +243,12 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                properties.setProperty("batch.size", "10240000");
                properties.setProperty("linger.ms", "10000");
 
-               int leaderId = kafkaServer.getLeaderToShutDown(topic);
-               BrokerRestartingMapper.resetState();
+               
BrokerRestartingMapper.resetState(kafkaServer::blockProxyTraffic);
 
                // process exactly failAfterElements number of elements and 
then shutdown Kafka broker and fail application
                DataStream<Integer> inputStream = env
                        .fromCollection(getIntegersSequence(numElements))
-                       .map(new BrokerRestartingMapper<Integer>(leaderId, 
failAfterElements));
+                       .map(new BrokerRestartingMapper<>(failAfterElements));
 
                StreamSink<Integer> kafkaSink = 
kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new 
FlinkKafkaPartitioner<Integer>() {
                        @Override
@@ -276,10 +275,10 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                        fail("Job should fail!");
                }
                catch (JobExecutionException ex) {
-                       assertEquals("Broker was shutdown!", 
ex.getCause().getMessage());
+                       // ignore error, it can be one of many errors so it 
would be hard to check the exception message/cause
                }
 
-               kafkaServer.restartBroker(leaderId);
+               kafkaServer.unblockProxyTraffic();
 
                // assert that before failure we successfully snapshot/flushed 
all expected elements
                assertAtLeastOnceForTopic(
@@ -438,22 +437,22 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                public static volatile boolean restartedLeaderBefore;
                public static volatile boolean hasBeenCheckpointedBeforeFailure;
                public static volatile int numElementsBeforeSnapshot;
+               public static volatile Runnable shutdownAction;
 
-               private final int shutdownBrokerId;
                private final int failCount;
                private int numElementsTotal;
 
                private boolean failer;
                private boolean hasBeenCheckpointed;
 
-               public static void resetState() {
+               public static void resetState(Runnable shutdownAction) {
                        restartedLeaderBefore = false;
                        hasBeenCheckpointedBeforeFailure = false;
                        numElementsBeforeSnapshot = 0;
+                       BrokerRestartingMapper.shutdownAction = shutdownAction;
                }
 
-               public BrokerRestartingMapper(int shutdownBrokerId, int 
failCount) {
-                       this.shutdownBrokerId = shutdownBrokerId;
+               public BrokerRestartingMapper(int failCount) {
                        this.failCount = failCount;
                }
 
@@ -471,31 +470,9 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
                                if (failer && numElementsTotal >= failCount) {
                                        // shut down a Kafka broker
-                                       KafkaServer toShutDown = null;
-                                       for (KafkaServer server : 
kafkaServer.getBrokers()) {
-
-                                               if 
(kafkaServer.getBrokerId(server) == shutdownBrokerId) {
-                                                       toShutDown = server;
-                                                       break;
-                                               }
-                                       }
-
-                                       if (toShutDown == null) {
-                                               StringBuilder listOfBrokers = 
new StringBuilder();
-                                               for (KafkaServer server : 
kafkaServer.getBrokers()) {
-                                                       
listOfBrokers.append(kafkaServer.getBrokerId(server));
-                                                       listOfBrokers.append(" 
; ");
-                                               }
-
-                                               throw new Exception("Cannot 
find broker to shut down: " + shutdownBrokerId
-                                                                               
                + " ; available brokers: " + listOfBrokers.toString());
-                                       } else {
-                                               
hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
-                                               restartedLeaderBefore = true;
-                                               toShutDown.shutdown();
-                                               toShutDown.awaitShutdown();
-                                               throw new Exception("Broker was 
shutdown!");
-                                       }
+                                       hasBeenCheckpointedBeforeFailure = 
hasBeenCheckpointed;
+                                       restartedLeaderBefore = true;
+                                       shutdownAction.run();
                                }
                        }
                        return value;

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 3163f52..fbf902f 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -115,7 +115,7 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
        }
 
        @AfterClass
-       public static void shutDownServices() {
+       public static void shutDownServices() throws Exception {
                TestStreamEnvironment.unsetAsContext();
 
                if (flink != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 8eb0351..19f38e2 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -90,18 +90,21 @@ public abstract class KafkaTestBase extends TestLogger {
 
        @BeforeClass
        public static void prepare() throws ClassNotFoundException {
+               prepare(true);
+       }
 
+       public static void prepare(boolean hideKafkaBehindProxy) throws 
ClassNotFoundException {
                
LOG.info("-------------------------------------------------------------------------");
                LOG.info("    Starting KafkaTestBase ");
                
LOG.info("-------------------------------------------------------------------------");
 
-               startClusters(false);
+               startClusters(false, hideKafkaBehindProxy);
 
                TestStreamEnvironment.setAsContext(flink, PARALLELISM);
        }
 
        @AfterClass
-       public static void shutDownServices() {
+       public static void shutDownServices() throws Exception {
 
                
LOG.info("-------------------------------------------------------------------------");
                LOG.info("    Shut down KafkaTestBase ");
@@ -127,7 +130,7 @@ public abstract class KafkaTestBase extends TestLogger {
                return flinkConfig;
        }
 
-       protected static void startClusters(boolean secureMode) throws 
ClassNotFoundException {
+       protected static void startClusters(boolean secureMode, boolean 
hideKafkaBehindProxy) throws ClassNotFoundException {
 
                // dynamically load the implementation for the test
                Class<?> clazz = 
Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
@@ -135,7 +138,10 @@ public abstract class KafkaTestBase extends TestLogger {
 
                LOG.info("Starting KafkaTestBase.prepare() for Kafka " + 
kafkaServer.getVersion());
 
-               
kafkaServer.prepare(kafkaServer.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS).setSecureMode(secureMode));
+               kafkaServer.prepare(kafkaServer.createConfig()
+                       .setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
+                       .setSecureMode(secureMode)
+                       .setHideKafkaBehindProxy(hideKafkaBehindProxy));
 
                standardProps = kafkaServer.getStandardProperties();
 
@@ -154,7 +160,7 @@ public abstract class KafkaTestBase extends TestLogger {
                flink.start();
        }
 
-       protected static void shutdownClusters() {
+       protected static void shutdownClusters() throws Exception {
 
                if (flink != null) {
                        flink.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/cd373efe/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index ea292a9..21171f8 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -29,6 +30,7 @@ import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import kafka.server.KafkaServer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -45,6 +47,7 @@ public abstract class KafkaTestEnvironment {
                private int kafkaServersNumber = 1;
                private Properties kafkaServerProperties = null;
                private boolean secureMode = false;
+               private boolean hideKafkaBehindProxy = false;
 
                /**
                 * Please use {@link KafkaTestEnvironment#createConfig()} 
method.
@@ -78,17 +81,32 @@ public abstract class KafkaTestEnvironment {
                        this.secureMode = secureMode;
                        return this;
                }
+
+               public boolean isHideKafkaBehindProxy() {
+                       return hideKafkaBehindProxy;
+               }
+
+               public Config setHideKafkaBehindProxy(boolean 
hideKafkaBehindProxy) {
+                       this.hideKafkaBehindProxy = hideKafkaBehindProxy;
+                       return this;
+               }
        }
 
        protected static final String KAFKA_HOST = "localhost";
 
+       protected final List<NetworkFailuresProxy> networkFailuresProxies = new 
ArrayList<>();
+
        public static Config createConfig() {
                return new Config();
        }
 
        public abstract void prepare(Config config);
 
-       public abstract void shutdown();
+       public void shutdown() throws Exception {
+               for (NetworkFailuresProxy proxy : networkFailuresProxies) {
+                       proxy.close();
+               }
+       }
 
        public abstract void deleteTestTopic(String topic);
 
@@ -168,4 +186,21 @@ public abstract class KafkaTestEnvironment {
 
        public abstract boolean isSecureRunSupported();
 
+       public void blockProxyTraffic() {
+               for (NetworkFailuresProxy proxy : networkFailuresProxies) {
+                       proxy.blockTraffic();
+               }
+       }
+
+       public void unblockProxyTraffic() {
+               for (NetworkFailuresProxy proxy : networkFailuresProxies) {
+                       proxy.unblockTraffic();
+               }
+       }
+
+       protected NetworkFailuresProxy createProxy(String remoteHost, int 
remotePort) {
+               NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, 
remoteHost, remotePort);
+               networkFailuresProxies.add(proxy);
+               return proxy;
+       }
 }

Reply via email to