This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3de8b3d1e023a49220e33fdbcef6061bc048bb5b Author: Fabian Paul <[email protected]> AuthorDate: Wed Dec 8 15:39:03 2021 +0100 [FLINK-25222][tests] Remove NetworkFailureProxy used in Kafka tests We suspect that the NetworkFailureProxy is causing constant connectivity problems to the brokers during testing resulting in either network timeouts or corrupted results. Since the NetworkFailureProxy is only used for testing the deprecated FlinkKafkaProducer/Consumer we can safely remove it because we will not add new features to the connectors. --- .../connectors/kafka/KafkaProducerTestBase.java | 132 ------------- .../streaming/connectors/kafka/KafkaTestBase.java | 12 +- .../connectors/kafka/KafkaTestEnvironment.java | 34 +--- .../connectors/kafka/KafkaTestEnvironmentImpl.java | 7 - .../flink/networking/NetworkFailureHandler.java | 214 --------------------- .../flink/networking/NetworkFailuresProxy.java | 125 ------------ .../flink/networking/NetworkFailuresProxyTest.java | 126 ------------ 7 files changed, 4 insertions(+), 646 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index 155ea7e..ce7db35 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -50,9 +49,7 @@ import org.junit.Test; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -211,135 +208,6 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink { } } - /** Tests the at-least-once semantic for the simple writes into Kafka. */ - @Test - public void testOneToOneAtLeastOnceRegularSink() throws Exception { - testOneToOneAtLeastOnce(true); - } - - /** Tests the at-least-once semantic for the simple writes into Kafka. */ - @Test - public void testOneToOneAtLeastOnceCustomOperator() throws Exception { - testOneToOneAtLeastOnce(false); - } - - /** - * This test sets KafkaProducer so that it will not automatically flush the data and simulate - * network failure between Flink and Kafka to check whether FlinkKafkaProducer flushed records - * manually on snapshotState. - * - * <p>Due to legacy reasons there are two different ways of instantiating a Kafka 0.10 sink. The - * parameter controls which method is used. - */ - protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception { - final String topic = - regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator"; - final int partition = 0; - final int numElements = 1000; - final int failAfterElements = 333; - - createTestTopic(topic, 1, 1); - - TypeInformationSerializationSchema<Integer> schema = - new TypeInformationSerializationSchema<>( - BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(500); - env.setParallelism(1); - env.setRestartStrategy(RestartStrategies.noRestart()); - - Properties properties = new Properties(); - properties.putAll(standardProps); - properties.putAll(secureProps); - // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer - // will try send pending (not flushed) data on close() - properties.setProperty("timeout.ms", "10000"); - // KafkaProducer prior to KIP-91 (release 2.1) uses request timeout to expire the unsent - // records. - properties.setProperty("request.timeout.ms", "3000"); - // KafkaProducer in 2.1.0 and above uses delivery timeout to expire the the records. - properties.setProperty("delivery.timeout.ms", "5000"); - properties.setProperty("max.block.ms", "10000"); - // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events - // instead of flushing them immediately - properties.setProperty("batch.size", "10240000"); - properties.setProperty("linger.ms", "10000"); - // kafka producer messages guarantee - properties.setProperty("retries", "3"); - properties.setProperty("acks", "all"); - - BrokerRestartingMapper.resetState(kafkaServer::blockProxyTraffic); - - // process exactly failAfterElements number of elements and then shutdown Kafka broker and - // fail application - DataStream<Integer> inputStream = - env.addSource(new InfiniteIntegerSource()) - .map(new BrokerRestartingMapper<>(failAfterElements)); - - StreamSink<Integer> kafkaSink = - kafkaServer.getProducerSink( - topic, - schema, - properties, - new FlinkKafkaPartitioner<Integer>() { - @Override - public int partition( - Integer record, - byte[] key, - byte[] value, - String targetTopic, - int[] partitions) { - return partition; - } - }); - - if (regularSink) { - inputStream.addSink(kafkaSink.getUserFunction()); - } else { - kafkaServer.produceIntoKafka( - inputStream, - topic, - schema, - properties, - new FlinkKafkaPartitioner<Integer>() { - @Override - public int partition( - Integer record, - byte[] key, - byte[] value, - String targetTopic, - int[] partitions) { - return partition; - } - }); - } - - try { - env.execute("One-to-one at least once test"); - fail("Job should fail!"); - } catch (JobExecutionException ex) { - // ignore error, it can be one of many errors so it would be hard to check the exception - // message/cause - } finally { - kafkaServer.unblockProxyTraffic(); - } - - // assert that before failure we successfully snapshot/flushed all expected elements - assertAtLeastOnceForTopic( - properties, - topic, - partition, - Collections.unmodifiableSet( - new HashSet<>( - getIntegersSequence( - BrokerRestartingMapper - .lastSnapshotedElementBeforeShutdown))), - KAFKA_READ_TIMEOUT); - - deleteTestTopic(topic); - } - /** Tests the exactly-once semantic for the simple writes into Kafka. */ @Test public void testExactlyOnceRegularSink() throws Exception { diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 42d8a61..5c1aa25 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -104,15 +104,11 @@ public abstract class KafkaTestBase extends TestLogger { @BeforeClass public static void prepare() throws Exception { - prepare(true); - } - - public static void prepare(boolean hideKafkaBehindProxy) throws Exception { LOG.info("-------------------------------------------------------------------------"); LOG.info(" Starting KafkaTestBase "); LOG.info("-------------------------------------------------------------------------"); - startClusters(false, hideKafkaBehindProxy); + startClusters(false); } @AfterClass @@ -147,13 +143,11 @@ public abstract class KafkaTestBase extends TestLogger { KafkaTestEnvironment.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)); } - public static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) - throws Exception { + public static void startClusters(boolean secureMode) throws Exception { startClusters( KafkaTestEnvironment.createConfig() .setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS) - .setSecureMode(secureMode) - .setHideKafkaBehindProxy(hideKafkaBehindProxy)); + .setSecureMode(secureMode)); } public static void startClusters(KafkaTestEnvironment.Config environmentConfig) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index c6fc932..4c9269f 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; -import org.apache.flink.networking.NetworkFailuresProxy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; @@ -31,7 +30,6 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import kafka.server.KafkaServer; import org.apache.kafka.clients.consumer.ConsumerRecord; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -45,7 +43,6 @@ public abstract class KafkaTestEnvironment { private int kafkaServersNumber = 1; private Properties kafkaServerProperties = null; private boolean secureMode = false; - private boolean hideKafkaBehindProxy = false; /** Please use {@link KafkaTestEnvironment#createConfig()} method. */ private Config() {} @@ -77,31 +74,20 @@ public abstract class KafkaTestEnvironment { return this; } - public boolean isHideKafkaBehindProxy() { - return hideKafkaBehindProxy; - } - public Config setHideKafkaBehindProxy(boolean hideKafkaBehindProxy) { - this.hideKafkaBehindProxy = hideKafkaBehindProxy; return this; } } protected static final String KAFKA_HOST = "localhost"; - protected final List<NetworkFailuresProxy> networkFailuresProxies = new ArrayList<>(); - public static Config createConfig() { return new Config(); } public abstract void prepare(Config config) throws Exception; - public void shutdown() throws Exception { - for (NetworkFailuresProxy proxy : networkFailuresProxies) { - proxy.close(); - } - } + public void shutdown() throws Exception {} public abstract void deleteTestTopic(String topic); @@ -225,24 +211,6 @@ public abstract class KafkaTestEnvironment { public abstract boolean isSecureRunSupported(); - public void blockProxyTraffic() { - for (NetworkFailuresProxy proxy : networkFailuresProxies) { - proxy.blockTraffic(); - } - } - - public void unblockProxyTraffic() { - for (NetworkFailuresProxy proxy : networkFailuresProxies) { - proxy.unblockTraffic(); - } - } - - protected NetworkFailuresProxy createProxy(String remoteHost, int remotePort) { - NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, remoteHost, remotePort); - networkFailuresProxies.add(proxy); - return proxy; - } - protected void maybePrintDanglingThreadStacktrace(String threadNameKeyword) { for (Map.Entry<Thread, StackTraceElement[]> threadEntry : Thread.getAllStackTraces().entrySet()) { diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 95e3241..141924b 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -23,7 +23,6 @@ import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.networking.NetworkFailuresProxy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; @@ -404,7 +403,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { // ignore } } - super.shutdown(); } protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception { @@ -433,11 +431,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { int kafkaPort = NetUtils.getAvailablePort(); kafkaProperties.put("port", Integer.toString(kafkaPort)); - if (config.isHideKafkaBehindProxy()) { - NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, kafkaPort); - kafkaProperties.put("advertised.port", proxy.getLocalPort()); - } - // to support secure kafka cluster if (config.isSecureMode()) { LOG.info("Adding Kafka secure configurations"); diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java deleted file mode 100644 index f2de546..0000000 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.networking; - -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; - -/** - * Handler that is forwarding inbound traffic from the source channel to the target channel on - * remoteHost:remotePort and the responses in the opposite direction. All of the network traffic can - * be blocked at any time using blocked flag. - */ -class NetworkFailureHandler extends SimpleChannelUpstreamHandler { - private static final Logger LOG = LoggerFactory.getLogger(NetworkFailureHandler.class); - private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler"; - - // mapping between source and target channels, used for finding correct target channel to use - // for given source. - private final Map<Channel, Channel> sourceToTargetChannels = new ConcurrentHashMap<>(); - private final Consumer<NetworkFailureHandler> onClose; - private final ClientSocketChannelFactory channelFactory; - private final String remoteHost; - private final int remotePort; - private final AtomicBoolean blocked; - // The set of channels that are being closed in closeOnFlush(). This is needed to avoid - // closing a channel recursively. See FLINK-22085 for more detail. - private static final Set<Channel> channelsBeingClosed = ConcurrentHashMap.newKeySet(); - - private static final ChannelFutureListener CLOSE_WITH_BOOKKEEPING = - new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) { - future.getChannel() - .close() - .addListener( - channelFuture -> - channelsBeingClosed.remove(channelFuture.getChannel())); - } - }; - - public NetworkFailureHandler( - AtomicBoolean blocked, - Consumer<NetworkFailureHandler> onClose, - ClientSocketChannelFactory channelFactory, - String remoteHost, - int remotePort) { - this.blocked = blocked; - this.onClose = onClose; - this.channelFactory = channelFactory; - this.remoteHost = remoteHost; - this.remotePort = remotePort; - } - - /** Closes the specified channel after all queued write requests are flushed. */ - static void closeOnFlush(Channel channel) { - if (channel.isConnected() && !channelsBeingClosed.contains(channel)) { - channelsBeingClosed.add(channel); - channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(CLOSE_WITH_BOOKKEEPING); - } - } - - public void closeConnections() { - for (Map.Entry<Channel, Channel> entry : sourceToTargetChannels.entrySet()) { - // target channel is closed on source's channel channelClosed even - entry.getKey().close(); - } - } - - @Override - public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) - throws Exception { - // Suspend incoming traffic until connected to the remote host. - final Channel sourceChannel = event.getChannel(); - sourceChannel.setReadable(false); - - boolean isBlocked = blocked.get(); - LOG.debug( - "Attempt to open proxy channel from [{}] to [{}:{}] in state [blocked = {}]", - sourceChannel.getLocalAddress(), - remoteHost, - remotePort, - isBlocked); - - if (isBlocked) { - sourceChannel.close(); - return; - } - - // Start the connection attempt. - ClientBootstrap targetConnectionBootstrap = new ClientBootstrap(channelFactory); - targetConnectionBootstrap - .getPipeline() - .addLast( - TARGET_CHANNEL_HANDLER_NAME, - new TargetChannelHandler(event.getChannel(), blocked)); - ChannelFuture connectFuture = - targetConnectionBootstrap.connect(new InetSocketAddress(remoteHost, remotePort)); - sourceToTargetChannels.put(sourceChannel, connectFuture.getChannel()); - - connectFuture.addListener( - future -> { - if (future.isSuccess()) { - // Connection attempt succeeded: - // Begin to accept incoming traffic. - sourceChannel.setReadable(true); - } else { - // Close the connection if the connection attempt has failed. - sourceChannel.close(); - } - }); - } - - @Override - public void messageReceived(ChannelHandlerContext context, MessageEvent event) - throws Exception { - if (blocked.get()) { - return; - } - - ChannelBuffer msg = (ChannelBuffer) event.getMessage(); - Channel targetChannel = sourceToTargetChannels.get(event.getChannel()); - if (targetChannel == null) { - throw new IllegalStateException( - "Could not find a target channel for the source channel"); - } - targetChannel.write(msg); - } - - @Override - public void channelClosed(ChannelHandlerContext context, ChannelStateEvent event) - throws Exception { - Channel targetChannel = sourceToTargetChannels.get(event.getChannel()); - if (targetChannel == null) { - return; - } - closeOnFlush(targetChannel); - sourceToTargetChannels.remove(event.getChannel()); - onClose.accept(this); - } - - @Override - public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) - throws Exception { - LOG.error("Closing communication channel because of an exception", event.getCause()); - closeOnFlush(event.getChannel()); - } - - private static class TargetChannelHandler extends SimpleChannelUpstreamHandler { - private final Channel sourceChannel; - private final AtomicBoolean blocked; - - TargetChannelHandler(Channel sourceChannel, AtomicBoolean blocked) { - this.sourceChannel = sourceChannel; - this.blocked = blocked; - } - - @Override - public void messageReceived(ChannelHandlerContext context, MessageEvent event) - throws Exception { - if (blocked.get()) { - return; - } - ChannelBuffer msg = (ChannelBuffer) event.getMessage(); - sourceChannel.write(msg); - } - - @Override - public void channelClosed(ChannelHandlerContext context, ChannelStateEvent event) - throws Exception { - closeOnFlush(sourceChannel); - } - - @Override - public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) - throws Exception { - LOG.error("Closing communication channel because of an exception", event.getCause()); - closeOnFlush(event.getChannel()); - } - } -} diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java deleted file mode 100644 index 44a5ae8..0000000 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.networking; - -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * This class acts as a network proxy - listening on local port and forwarding all of the network to - * the remote host/port. It allows to simulate a network failures in the communication. - */ -public class NetworkFailuresProxy implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(NetworkFailuresProxy.class); - private static final String NETWORK_FAILURE_HANDLER_NAME = "network_failure_handler"; - - private final Executor executor = Executors.newCachedThreadPool(); - private final ServerBootstrap serverBootstrap; - private final Channel channel; - private final AtomicBoolean blocked = new AtomicBoolean(); - // collection of networkFailureHandlers so that we can call {@link - // NetworkFailureHandler.closeConnections} on them. - private final Set<NetworkFailureHandler> networkFailureHandlers = - Collections.newSetFromMap(new ConcurrentHashMap<>()); - - public NetworkFailuresProxy(int localPort, String remoteHost, int remotePort) { - // Configure the bootstrap. - serverBootstrap = - new ServerBootstrap(new NioServerSocketChannelFactory(executor, executor)); - - // Set up the event pipeline factory. - ClientSocketChannelFactory channelFactory = - new NioClientSocketChannelFactory(executor, executor); - serverBootstrap.setOption("child.tcpNoDelay", true); - serverBootstrap.setOption("child.keepAlive", true); - serverBootstrap.setPipelineFactory( - new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - - // synchronized for a race between blocking and creating new handlers - synchronized (networkFailureHandlers) { - NetworkFailureHandler failureHandler = - new NetworkFailureHandler( - blocked, - networkFailureHandler -> - networkFailureHandlers.remove( - networkFailureHandler), - channelFactory, - remoteHost, - remotePort); - networkFailureHandlers.add(failureHandler); - pipeline.addLast(NETWORK_FAILURE_HANDLER_NAME, failureHandler); - } - return pipeline; - } - }); - channel = serverBootstrap.bind(new InetSocketAddress(localPort)); - - LOG.info("Proxying [*:{}] to [{}:{}]", getLocalPort(), remoteHost, remotePort); - } - - /** @return local port on which {@link NetworkFailuresProxy} is listening. */ - public int getLocalPort() { - return ((InetSocketAddress) channel.getLocalAddress()).getPort(); - } - - /** Blocks all ongoing traffic, closes all ongoing and closes any new incoming connections. */ - public void blockTraffic() { - setTrafficBlocked(true); - } - - /** Resumes normal communication. */ - public void unblockTraffic() { - setTrafficBlocked(false); - } - - @Override - public void close() throws Exception { - channel.close(); - } - - private void setTrafficBlocked(boolean blocked) { - this.blocked.set(blocked); - if (blocked) { - // synchronized for a race between blocking and creating new handlers - synchronized (networkFailureHandlers) { - for (NetworkFailureHandler failureHandler : networkFailureHandlers) { - failureHandler.closeConnections(); - } - } - } - } -} diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java deleted file mode 100644 index 6d1e260..0000000 --- a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.networking; - -import org.junit.Test; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.net.Socket; -import java.net.SocketException; - -import static org.junit.Assert.assertEquals; - -/** Tests for NetworkFailuresProxy. */ -public class NetworkFailuresProxyTest { - public static final int SOCKET_TIMEOUT = 500_000; - - @Test - public void testProxy() throws Exception { - try (EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT); - NetworkFailuresProxy proxy = - new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort()); - EchoClient echoClient = - new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { - echoServer.start(); - - assertEquals("42", echoClient.write("42")); - assertEquals("Ala ma kota!", echoClient.write("Ala ma kota!")); - } - } - - @Test - public void testMultipleConnections() throws Exception { - try (EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT); - NetworkFailuresProxy proxy = - new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort()); - EchoClient echoClient1 = - new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT); - EchoClient echoClient2 = - new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { - echoServer.start(); - - assertEquals("42", echoClient1.write("42")); - assertEquals("Ala ma kota!", echoClient2.write("Ala ma kota!")); - assertEquals("Ala hat eine Katze!", echoClient1.write("Ala hat eine Katze!")); - } - } - - @Test - public void testBlockTraffic() throws Exception { - try (EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT); - NetworkFailuresProxy proxy = - new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort())) { - echoServer.start(); - - try (EchoClient echoClient = - new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { - assertEquals("42", echoClient.write("42")); - proxy.blockTraffic(); - try { - echoClient.write("Ala ma kota!"); - } catch (SocketException ex) { - assertEquals("Connection reset", ex.getMessage()); - } - } - - try (EchoClient echoClient = - new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { - assertEquals(null, echoClient.write("42")); - } catch (SocketException ex) { - assertEquals("Connection reset", ex.getMessage()); - } - - proxy.unblockTraffic(); - try (EchoClient echoClient = - new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { - assertEquals("42", echoClient.write("42")); - assertEquals("Ala ma kota!", echoClient.write("Ala ma kota!")); - } - } - } - - /** Simple echo client that sends a message over the network and waits for the answer. */ - public static class EchoClient implements AutoCloseable { - private final Socket socket; - private final PrintWriter output; - private final BufferedReader input; - - public EchoClient(String hostName, int portNumber, int socketTimeout) throws IOException { - socket = new Socket(hostName, portNumber); - socket.setSoTimeout(socketTimeout); - output = new PrintWriter(socket.getOutputStream(), true); - input = new BufferedReader(new InputStreamReader(socket.getInputStream())); - } - - public String write(String message) throws IOException { - output.println(message); - return input.readLine(); - } - - @Override - public void close() throws Exception { - input.close(); - output.close(); - socket.close(); - } - } -}
