This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3de8b3d1e023a49220e33fdbcef6061bc048bb5b
Author: Fabian Paul <[email protected]>
AuthorDate: Wed Dec 8 15:39:03 2021 +0100

    [FLINK-25222][tests] Remove NetworkFailureProxy used in Kafka tests
    
    We suspect that the NetworkFailureProxy is causing constant connectivity
    problems to the brokers during testing resulting in either network
    timeouts or corrupted results.
    Since the NetworkFailureProxy is only used for testing the deprecated
    FlinkKafkaProducer/Consumer we can safely remove it because we will not
    add new features to the connectors.
---
 .../connectors/kafka/KafkaProducerTestBase.java    | 132 -------------
 .../streaming/connectors/kafka/KafkaTestBase.java  |  12 +-
 .../connectors/kafka/KafkaTestEnvironment.java     |  34 +---
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |   7 -
 .../flink/networking/NetworkFailureHandler.java    | 214 ---------------------
 .../flink/networking/NetworkFailuresProxy.java     | 125 ------------
 .../flink/networking/NetworkFailuresProxyTest.java | 126 ------------
 7 files changed, 4 insertions(+), 646 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 155ea7e..ce7db35 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -50,9 +49,7 @@ import org.junit.Test;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -211,135 +208,6 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBaseWithFlink {
         }
     }
 
-    /** Tests the at-least-once semantic for the simple writes into Kafka. */
-    @Test
-    public void testOneToOneAtLeastOnceRegularSink() throws Exception {
-        testOneToOneAtLeastOnce(true);
-    }
-
-    /** Tests the at-least-once semantic for the simple writes into Kafka. */
-    @Test
-    public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
-        testOneToOneAtLeastOnce(false);
-    }
-
-    /**
-     * This test sets KafkaProducer so that it will not automatically flush 
the data and simulate
-     * network failure between Flink and Kafka to check whether 
FlinkKafkaProducer flushed records
-     * manually on snapshotState.
-     *
-     * <p>Due to legacy reasons there are two different ways of instantiating 
a Kafka 0.10 sink. The
-     * parameter controls which method is used.
-     */
-    protected void testOneToOneAtLeastOnce(boolean regularSink) throws 
Exception {
-        final String topic =
-                regularSink ? "oneToOneTopicRegularSink" : 
"oneToOneTopicCustomOperator";
-        final int partition = 0;
-        final int numElements = 1000;
-        final int failAfterElements = 333;
-
-        createTestTopic(topic, 1, 1);
-
-        TypeInformationSerializationSchema<Integer> schema =
-                new TypeInformationSerializationSchema<>(
-                        BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.enableCheckpointing(500);
-        env.setParallelism(1);
-        env.setRestartStrategy(RestartStrategies.noRestart());
-
-        Properties properties = new Properties();
-        properties.putAll(standardProps);
-        properties.putAll(secureProps);
-        // decrease timeout and block time from 60s down to 10s - this is how 
long KafkaProducer
-        // will try send pending (not flushed) data on close()
-        properties.setProperty("timeout.ms", "10000");
-        // KafkaProducer prior to KIP-91 (release 2.1) uses request timeout to 
expire the unsent
-        // records.
-        properties.setProperty("request.timeout.ms", "3000");
-        // KafkaProducer in 2.1.0 and above uses delivery timeout to expire 
the the records.
-        properties.setProperty("delivery.timeout.ms", "5000");
-        properties.setProperty("max.block.ms", "10000");
-        // increase batch.size and linger.ms - this tells KafkaProducer to 
batch produced events
-        // instead of flushing them immediately
-        properties.setProperty("batch.size", "10240000");
-        properties.setProperty("linger.ms", "10000");
-        // kafka producer messages guarantee
-        properties.setProperty("retries", "3");
-        properties.setProperty("acks", "all");
-
-        BrokerRestartingMapper.resetState(kafkaServer::blockProxyTraffic);
-
-        // process exactly failAfterElements number of elements and then 
shutdown Kafka broker and
-        // fail application
-        DataStream<Integer> inputStream =
-                env.addSource(new InfiniteIntegerSource())
-                        .map(new BrokerRestartingMapper<>(failAfterElements));
-
-        StreamSink<Integer> kafkaSink =
-                kafkaServer.getProducerSink(
-                        topic,
-                        schema,
-                        properties,
-                        new FlinkKafkaPartitioner<Integer>() {
-                            @Override
-                            public int partition(
-                                    Integer record,
-                                    byte[] key,
-                                    byte[] value,
-                                    String targetTopic,
-                                    int[] partitions) {
-                                return partition;
-                            }
-                        });
-
-        if (regularSink) {
-            inputStream.addSink(kafkaSink.getUserFunction());
-        } else {
-            kafkaServer.produceIntoKafka(
-                    inputStream,
-                    topic,
-                    schema,
-                    properties,
-                    new FlinkKafkaPartitioner<Integer>() {
-                        @Override
-                        public int partition(
-                                Integer record,
-                                byte[] key,
-                                byte[] value,
-                                String targetTopic,
-                                int[] partitions) {
-                            return partition;
-                        }
-                    });
-        }
-
-        try {
-            env.execute("One-to-one at least once test");
-            fail("Job should fail!");
-        } catch (JobExecutionException ex) {
-            // ignore error, it can be one of many errors so it would be hard 
to check the exception
-            // message/cause
-        } finally {
-            kafkaServer.unblockProxyTraffic();
-        }
-
-        // assert that before failure we successfully snapshot/flushed all 
expected elements
-        assertAtLeastOnceForTopic(
-                properties,
-                topic,
-                partition,
-                Collections.unmodifiableSet(
-                        new HashSet<>(
-                                getIntegersSequence(
-                                        BrokerRestartingMapper
-                                                
.lastSnapshotedElementBeforeShutdown))),
-                KAFKA_READ_TIMEOUT);
-
-        deleteTestTopic(topic);
-    }
-
     /** Tests the exactly-once semantic for the simple writes into Kafka. */
     @Test
     public void testExactlyOnceRegularSink() throws Exception {
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 42d8a61..5c1aa25 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -104,15 +104,11 @@ public abstract class KafkaTestBase extends TestLogger {
 
     @BeforeClass
     public static void prepare() throws Exception {
-        prepare(true);
-    }
-
-    public static void prepare(boolean hideKafkaBehindProxy) throws Exception {
         
LOG.info("-------------------------------------------------------------------------");
         LOG.info("    Starting KafkaTestBase ");
         
LOG.info("-------------------------------------------------------------------------");
 
-        startClusters(false, hideKafkaBehindProxy);
+        startClusters(false);
     }
 
     @AfterClass
@@ -147,13 +143,11 @@ public abstract class KafkaTestBase extends TestLogger {
                 
KafkaTestEnvironment.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS));
     }
 
-    public static void startClusters(boolean secureMode, boolean 
hideKafkaBehindProxy)
-            throws Exception {
+    public static void startClusters(boolean secureMode) throws Exception {
         startClusters(
                 KafkaTestEnvironment.createConfig()
                         .setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
-                        .setSecureMode(secureMode)
-                        .setHideKafkaBehindProxy(hideKafkaBehindProxy));
+                        .setSecureMode(secureMode));
     }
 
     public static void startClusters(KafkaTestEnvironment.Config 
environmentConfig)
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index c6fc932..4c9269f 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
-import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -31,7 +30,6 @@ import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import kafka.server.KafkaServer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -45,7 +43,6 @@ public abstract class KafkaTestEnvironment {
         private int kafkaServersNumber = 1;
         private Properties kafkaServerProperties = null;
         private boolean secureMode = false;
-        private boolean hideKafkaBehindProxy = false;
 
         /** Please use {@link KafkaTestEnvironment#createConfig()} method. */
         private Config() {}
@@ -77,31 +74,20 @@ public abstract class KafkaTestEnvironment {
             return this;
         }
 
-        public boolean isHideKafkaBehindProxy() {
-            return hideKafkaBehindProxy;
-        }
-
         public Config setHideKafkaBehindProxy(boolean hideKafkaBehindProxy) {
-            this.hideKafkaBehindProxy = hideKafkaBehindProxy;
             return this;
         }
     }
 
     protected static final String KAFKA_HOST = "localhost";
 
-    protected final List<NetworkFailuresProxy> networkFailuresProxies = new 
ArrayList<>();
-
     public static Config createConfig() {
         return new Config();
     }
 
     public abstract void prepare(Config config) throws Exception;
 
-    public void shutdown() throws Exception {
-        for (NetworkFailuresProxy proxy : networkFailuresProxies) {
-            proxy.close();
-        }
-    }
+    public void shutdown() throws Exception {}
 
     public abstract void deleteTestTopic(String topic);
 
@@ -225,24 +211,6 @@ public abstract class KafkaTestEnvironment {
 
     public abstract boolean isSecureRunSupported();
 
-    public void blockProxyTraffic() {
-        for (NetworkFailuresProxy proxy : networkFailuresProxies) {
-            proxy.blockTraffic();
-        }
-    }
-
-    public void unblockProxyTraffic() {
-        for (NetworkFailuresProxy proxy : networkFailuresProxies) {
-            proxy.unblockTraffic();
-        }
-    }
-
-    protected NetworkFailuresProxy createProxy(String remoteHost, int 
remotePort) {
-        NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, remoteHost, 
remotePort);
-        networkFailuresProxies.add(proxy);
-        return proxy;
-    }
-
     protected void maybePrintDanglingThreadStacktrace(String 
threadNameKeyword) {
         for (Map.Entry<Thread, StackTraceElement[]> threadEntry :
                 Thread.getAllStackTraces().entrySet()) {
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 95e3241..141924b 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -23,7 +23,6 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -404,7 +403,6 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                 // ignore
             }
         }
-        super.shutdown();
     }
 
     protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws 
Exception {
@@ -433,11 +431,6 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
             int kafkaPort = NetUtils.getAvailablePort();
             kafkaProperties.put("port", Integer.toString(kafkaPort));
 
-            if (config.isHideKafkaBehindProxy()) {
-                NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, 
kafkaPort);
-                kafkaProperties.put("advertised.port", proxy.getLocalPort());
-            }
-
             // to support secure kafka cluster
             if (config.isSecureMode()) {
                 LOG.info("Adding Kafka secure configurations");
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
deleted file mode 100644
index f2de546..0000000
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.networking;
-
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
-
-/**
- * Handler that is forwarding inbound traffic from the source channel to the 
target channel on
- * remoteHost:remotePort and the responses in the opposite direction. All of 
the network traffic can
- * be blocked at any time using blocked flag.
- */
-class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
-    private static final Logger LOG = 
LoggerFactory.getLogger(NetworkFailureHandler.class);
-    private static final String TARGET_CHANNEL_HANDLER_NAME = 
"target_channel_handler";
-
-    // mapping between source and target channels, used for finding correct 
target channel to use
-    // for given source.
-    private final Map<Channel, Channel> sourceToTargetChannels = new 
ConcurrentHashMap<>();
-    private final Consumer<NetworkFailureHandler> onClose;
-    private final ClientSocketChannelFactory channelFactory;
-    private final String remoteHost;
-    private final int remotePort;
-    private final AtomicBoolean blocked;
-    // The set of channels that are being closed in closeOnFlush(). This is 
needed to avoid
-    // closing a channel recursively. See FLINK-22085 for more detail.
-    private static final Set<Channel> channelsBeingClosed = 
ConcurrentHashMap.newKeySet();
-
-    private static final ChannelFutureListener CLOSE_WITH_BOOKKEEPING =
-            new ChannelFutureListener() {
-                public void operationComplete(ChannelFuture future) {
-                    future.getChannel()
-                            .close()
-                            .addListener(
-                                    channelFuture ->
-                                            
channelsBeingClosed.remove(channelFuture.getChannel()));
-                }
-            };
-
-    public NetworkFailureHandler(
-            AtomicBoolean blocked,
-            Consumer<NetworkFailureHandler> onClose,
-            ClientSocketChannelFactory channelFactory,
-            String remoteHost,
-            int remotePort) {
-        this.blocked = blocked;
-        this.onClose = onClose;
-        this.channelFactory = channelFactory;
-        this.remoteHost = remoteHost;
-        this.remotePort = remotePort;
-    }
-
-    /** Closes the specified channel after all queued write requests are 
flushed. */
-    static void closeOnFlush(Channel channel) {
-        if (channel.isConnected() && !channelsBeingClosed.contains(channel)) {
-            channelsBeingClosed.add(channel);
-            
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(CLOSE_WITH_BOOKKEEPING);
-        }
-    }
-
-    public void closeConnections() {
-        for (Map.Entry<Channel, Channel> entry : 
sourceToTargetChannels.entrySet()) {
-            // target channel is closed on source's channel channelClosed even
-            entry.getKey().close();
-        }
-    }
-
-    @Override
-    public void channelOpen(ChannelHandlerContext context, ChannelStateEvent 
event)
-            throws Exception {
-        // Suspend incoming traffic until connected to the remote host.
-        final Channel sourceChannel = event.getChannel();
-        sourceChannel.setReadable(false);
-
-        boolean isBlocked = blocked.get();
-        LOG.debug(
-                "Attempt to open proxy channel from [{}] to [{}:{}] in state 
[blocked = {}]",
-                sourceChannel.getLocalAddress(),
-                remoteHost,
-                remotePort,
-                isBlocked);
-
-        if (isBlocked) {
-            sourceChannel.close();
-            return;
-        }
-
-        // Start the connection attempt.
-        ClientBootstrap targetConnectionBootstrap = new 
ClientBootstrap(channelFactory);
-        targetConnectionBootstrap
-                .getPipeline()
-                .addLast(
-                        TARGET_CHANNEL_HANDLER_NAME,
-                        new TargetChannelHandler(event.getChannel(), blocked));
-        ChannelFuture connectFuture =
-                targetConnectionBootstrap.connect(new 
InetSocketAddress(remoteHost, remotePort));
-        sourceToTargetChannels.put(sourceChannel, connectFuture.getChannel());
-
-        connectFuture.addListener(
-                future -> {
-                    if (future.isSuccess()) {
-                        // Connection attempt succeeded:
-                        // Begin to accept incoming traffic.
-                        sourceChannel.setReadable(true);
-                    } else {
-                        // Close the connection if the connection attempt has 
failed.
-                        sourceChannel.close();
-                    }
-                });
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext context, MessageEvent 
event)
-            throws Exception {
-        if (blocked.get()) {
-            return;
-        }
-
-        ChannelBuffer msg = (ChannelBuffer) event.getMessage();
-        Channel targetChannel = sourceToTargetChannels.get(event.getChannel());
-        if (targetChannel == null) {
-            throw new IllegalStateException(
-                    "Could not find a target channel for the source channel");
-        }
-        targetChannel.write(msg);
-    }
-
-    @Override
-    public void channelClosed(ChannelHandlerContext context, ChannelStateEvent 
event)
-            throws Exception {
-        Channel targetChannel = sourceToTargetChannels.get(event.getChannel());
-        if (targetChannel == null) {
-            return;
-        }
-        closeOnFlush(targetChannel);
-        sourceToTargetChannels.remove(event.getChannel());
-        onClose.accept(this);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent 
event)
-            throws Exception {
-        LOG.error("Closing communication channel because of an exception", 
event.getCause());
-        closeOnFlush(event.getChannel());
-    }
-
-    private static class TargetChannelHandler extends 
SimpleChannelUpstreamHandler {
-        private final Channel sourceChannel;
-        private final AtomicBoolean blocked;
-
-        TargetChannelHandler(Channel sourceChannel, AtomicBoolean blocked) {
-            this.sourceChannel = sourceChannel;
-            this.blocked = blocked;
-        }
-
-        @Override
-        public void messageReceived(ChannelHandlerContext context, 
MessageEvent event)
-                throws Exception {
-            if (blocked.get()) {
-                return;
-            }
-            ChannelBuffer msg = (ChannelBuffer) event.getMessage();
-            sourceChannel.write(msg);
-        }
-
-        @Override
-        public void channelClosed(ChannelHandlerContext context, 
ChannelStateEvent event)
-                throws Exception {
-            closeOnFlush(sourceChannel);
-        }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext context, 
ExceptionEvent event)
-                throws Exception {
-            LOG.error("Closing communication channel because of an exception", 
event.getCause());
-            closeOnFlush(event.getChannel());
-        }
-    }
-}
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
deleted file mode 100644
index 44a5ae8..0000000
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.networking;
-
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * This class acts as a network proxy - listening on local port and forwarding 
all of the network to
- * the remote host/port. It allows to simulate a network failures in the 
communication.
- */
-public class NetworkFailuresProxy implements AutoCloseable {
-    private static final Logger LOG = 
LoggerFactory.getLogger(NetworkFailuresProxy.class);
-    private static final String NETWORK_FAILURE_HANDLER_NAME = 
"network_failure_handler";
-
-    private final Executor executor = Executors.newCachedThreadPool();
-    private final ServerBootstrap serverBootstrap;
-    private final Channel channel;
-    private final AtomicBoolean blocked = new AtomicBoolean();
-    // collection of networkFailureHandlers so that we can call {@link
-    // NetworkFailureHandler.closeConnections} on them.
-    private final Set<NetworkFailureHandler> networkFailureHandlers =
-            Collections.newSetFromMap(new ConcurrentHashMap<>());
-
-    public NetworkFailuresProxy(int localPort, String remoteHost, int 
remotePort) {
-        // Configure the bootstrap.
-        serverBootstrap =
-                new ServerBootstrap(new 
NioServerSocketChannelFactory(executor, executor));
-
-        // Set up the event pipeline factory.
-        ClientSocketChannelFactory channelFactory =
-                new NioClientSocketChannelFactory(executor, executor);
-        serverBootstrap.setOption("child.tcpNoDelay", true);
-        serverBootstrap.setOption("child.keepAlive", true);
-        serverBootstrap.setPipelineFactory(
-                new ChannelPipelineFactory() {
-                    public ChannelPipeline getPipeline() throws Exception {
-                        ChannelPipeline pipeline = Channels.pipeline();
-
-                        // synchronized for a race between blocking and 
creating new handlers
-                        synchronized (networkFailureHandlers) {
-                            NetworkFailureHandler failureHandler =
-                                    new NetworkFailureHandler(
-                                            blocked,
-                                            networkFailureHandler ->
-                                                    
networkFailureHandlers.remove(
-                                                            
networkFailureHandler),
-                                            channelFactory,
-                                            remoteHost,
-                                            remotePort);
-                            networkFailureHandlers.add(failureHandler);
-                            pipeline.addLast(NETWORK_FAILURE_HANDLER_NAME, 
failureHandler);
-                        }
-                        return pipeline;
-                    }
-                });
-        channel = serverBootstrap.bind(new InetSocketAddress(localPort));
-
-        LOG.info("Proxying [*:{}] to [{}:{}]", getLocalPort(), remoteHost, 
remotePort);
-    }
-
-    /** @return local port on which {@link NetworkFailuresProxy} is listening. 
*/
-    public int getLocalPort() {
-        return ((InetSocketAddress) channel.getLocalAddress()).getPort();
-    }
-
-    /** Blocks all ongoing traffic, closes all ongoing and closes any new 
incoming connections. */
-    public void blockTraffic() {
-        setTrafficBlocked(true);
-    }
-
-    /** Resumes normal communication. */
-    public void unblockTraffic() {
-        setTrafficBlocked(false);
-    }
-
-    @Override
-    public void close() throws Exception {
-        channel.close();
-    }
-
-    private void setTrafficBlocked(boolean blocked) {
-        this.blocked.set(blocked);
-        if (blocked) {
-            // synchronized for a race between blocking and creating new 
handlers
-            synchronized (networkFailureHandlers) {
-                for (NetworkFailureHandler failureHandler : 
networkFailureHandlers) {
-                    failureHandler.closeConnections();
-                }
-            }
-        }
-    }
-}
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
deleted file mode 100644
index 6d1e260..0000000
--- 
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.networking;
-
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.net.Socket;
-import java.net.SocketException;
-
-import static org.junit.Assert.assertEquals;
-
-/** Tests for NetworkFailuresProxy. */
-public class NetworkFailuresProxyTest {
-    public static final int SOCKET_TIMEOUT = 500_000;
-
-    @Test
-    public void testProxy() throws Exception {
-        try (EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT);
-                NetworkFailuresProxy proxy =
-                        new NetworkFailuresProxy(0, "localhost", 
echoServer.getLocalPort());
-                EchoClient echoClient =
-                        new EchoClient("localhost", proxy.getLocalPort(), 
SOCKET_TIMEOUT)) {
-            echoServer.start();
-
-            assertEquals("42", echoClient.write("42"));
-            assertEquals("Ala ma kota!", echoClient.write("Ala ma kota!"));
-        }
-    }
-
-    @Test
-    public void testMultipleConnections() throws Exception {
-        try (EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT);
-                NetworkFailuresProxy proxy =
-                        new NetworkFailuresProxy(0, "localhost", 
echoServer.getLocalPort());
-                EchoClient echoClient1 =
-                        new EchoClient("localhost", proxy.getLocalPort(), 
SOCKET_TIMEOUT);
-                EchoClient echoClient2 =
-                        new EchoClient("localhost", proxy.getLocalPort(), 
SOCKET_TIMEOUT)) {
-            echoServer.start();
-
-            assertEquals("42", echoClient1.write("42"));
-            assertEquals("Ala ma kota!", echoClient2.write("Ala ma kota!"));
-            assertEquals("Ala hat eine Katze!", echoClient1.write("Ala hat 
eine Katze!"));
-        }
-    }
-
-    @Test
-    public void testBlockTraffic() throws Exception {
-        try (EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT);
-                NetworkFailuresProxy proxy =
-                        new NetworkFailuresProxy(0, "localhost", 
echoServer.getLocalPort())) {
-            echoServer.start();
-
-            try (EchoClient echoClient =
-                    new EchoClient("localhost", proxy.getLocalPort(), 
SOCKET_TIMEOUT)) {
-                assertEquals("42", echoClient.write("42"));
-                proxy.blockTraffic();
-                try {
-                    echoClient.write("Ala ma kota!");
-                } catch (SocketException ex) {
-                    assertEquals("Connection reset", ex.getMessage());
-                }
-            }
-
-            try (EchoClient echoClient =
-                    new EchoClient("localhost", proxy.getLocalPort(), 
SOCKET_TIMEOUT)) {
-                assertEquals(null, echoClient.write("42"));
-            } catch (SocketException ex) {
-                assertEquals("Connection reset", ex.getMessage());
-            }
-
-            proxy.unblockTraffic();
-            try (EchoClient echoClient =
-                    new EchoClient("localhost", proxy.getLocalPort(), 
SOCKET_TIMEOUT)) {
-                assertEquals("42", echoClient.write("42"));
-                assertEquals("Ala ma kota!", echoClient.write("Ala ma kota!"));
-            }
-        }
-    }
-
-    /** Simple echo client that sends a message over the network and waits for 
the answer. */
-    public static class EchoClient implements AutoCloseable {
-        private final Socket socket;
-        private final PrintWriter output;
-        private final BufferedReader input;
-
-        public EchoClient(String hostName, int portNumber, int socketTimeout) 
throws IOException {
-            socket = new Socket(hostName, portNumber);
-            socket.setSoTimeout(socketTimeout);
-            output = new PrintWriter(socket.getOutputStream(), true);
-            input = new BufferedReader(new 
InputStreamReader(socket.getInputStream()));
-        }
-
-        public String write(String message) throws IOException {
-            output.println(message);
-            return input.readLine();
-        }
-
-        @Override
-        public void close() throws Exception {
-            input.close();
-            output.close();
-            socket.close();
-        }
-    }
-}

Reply via email to