This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d4a50a7467ea64840c4a24f5291613d2f7a3e1d5 Author: 1996fanrui <[email protected]> AuthorDate: Sun Nov 20 15:37:44 2022 +0800 [FLINK-28695][refactor][network] Refactor the nettyServerAndClient shutdown and some lambdas --- .../network/netty/NettyPartitionRequestClient.java | 98 ++++++++++------------ .../netty/PartitionRequestClientFactoryTest.java | 26 +++--- 2 files changed, 55 insertions(+), 69 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java index 2bfa28b6040..cbd823c134c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java @@ -131,29 +131,26 @@ public class NettyPartitionRequestClient implements PartitionRequestClient { inputChannel.getInitialCredit()); final ChannelFutureListener listener = - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - if (!future.isSuccess()) { - clientHandler.removeInputChannel(inputChannel); - inputChannel.onError( - new LocalTransportException( - String.format( - "Sending the partition request to '%s [%s] (#%d)' failed.", - connectionId.getAddress(), - connectionId - .getResourceID() - .getStringWithMetadata(), - connectionId.getConnectionIndex()), - future.channel().localAddress(), - future.cause())); - sendToChannel( - new ConnectionErrorMessage( - future.cause() == null - ? new RuntimeException( - "Cannot send partition request.") - : future.cause())); - } + future -> { + if (!future.isSuccess()) { + clientHandler.removeInputChannel(inputChannel); + inputChannel.onError( + new LocalTransportException( + String.format( + "Sending the partition request to '%s [%s] (#%d)' failed.", + connectionId.getAddress(), + connectionId + .getResourceID() + .getStringWithMetadata(), + connectionId.getConnectionIndex()), + future.channel().localAddress(), + future.cause())); + sendToChannel( + new ConnectionErrorMessage( + future.cause() == null + ? new RuntimeException( + "Cannot send partition request.") + : future.cause())); } }; @@ -165,12 +162,9 @@ public class NettyPartitionRequestClient implements PartitionRequestClient { tcpChannel .eventLoop() .schedule( - new Runnable() { - @Override - public void run() { - f[0] = tcpChannel.writeAndFlush(request); - f[0].addListener(listener); - } + () -> { + f[0] = tcpChannel.writeAndFlush(request); + f[0].addListener(listener); }, delayMs, TimeUnit.MILLISECONDS); @@ -194,30 +188,28 @@ public class NettyPartitionRequestClient implements PartitionRequestClient { .writeAndFlush( new TaskEventRequest(event, partitionId, inputChannel.getInputChannelId())) .addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - if (!future.isSuccess()) { - inputChannel.onError( - new LocalTransportException( - String.format( - "Sending the task event to '%s [%s] (#%d)' failed.", - connectionId.getAddress(), - connectionId - .getResourceID() - .getStringWithMetadata(), - connectionId.getConnectionIndex()), - future.channel().localAddress(), - future.cause())); - sendToChannel( - new ConnectionErrorMessage( - future.cause() == null - ? new RuntimeException( - "Cannot send task event.") - : future.cause())); - } - } - }); + (ChannelFutureListener) + future -> { + if (!future.isSuccess()) { + inputChannel.onError( + new LocalTransportException( + String.format( + "Sending the task event to '%s [%s] (#%d)' failed.", + connectionId.getAddress(), + connectionId + .getResourceID() + .getStringWithMetadata(), + connectionId.getConnectionIndex()), + future.channel().localAddress(), + future.cause())); + sendToChannel( + new ConnectionErrorMessage( + future.cause() == null + ? new RuntimeException( + "Cannot send task event.") + : future.cause())); + } + }); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java index f8ba8118a77..56110d95cbc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java @@ -48,6 +48,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.fail; @@ -81,8 +82,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger { factory.createPartitionRequestClient( nettyServerAndClient.getConnectionID(RESOURCE_ID, 0)); } finally { - nettyServerAndClient.client().shutdown(); - nettyServerAndClient.server().shutdown(); + shutdown(nettyServerAndClient); } } @@ -125,8 +125,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger { factory.createPartitionRequestClient(connectionID); } finally { - nettyServerAndClient.client().shutdown(); - nettyServerAndClient.server().shutdown(); + shutdown(nettyServerAndClient); } } @@ -139,8 +138,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger { checkReuseNettyPartitionRequestClient(nettyServerAndClient, 5); checkReuseNettyPartitionRequestClient(nettyServerAndClient, 10); } finally { - nettyServerAndClient.client().shutdown(); - nettyServerAndClient.server().shutdown(); + shutdown(nettyServerAndClient); } } @@ -176,8 +174,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger { factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0)); - serverAndClient.client().shutdown(); - serverAndClient.server().shutdown(); + shutdown(serverAndClient); } // see https://issues.apache.org/jira/browse/FLINK-18821 @@ -218,14 +215,12 @@ public class PartitionRequestClientFactoryTest extends TestLogger { unstableNettyClient, 2, 1, connectionReuseEnabled); assertThatThrownBy( - () -> { - factory.createPartitionRequestClient( - serverAndClient.getConnectionID(RESOURCE_ID, 0)); - }) + () -> + factory.createPartitionRequestClient( + serverAndClient.getConnectionID(RESOURCE_ID, 0))) .isInstanceOf(IOException.class); } finally { - serverAndClient.client().shutdown(); - serverAndClient.server().shutdown(); + shutdown(serverAndClient); } } @@ -274,8 +269,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger { }); threadPoolExecutor.shutdown(); - serverAndClient.client().shutdown(); - serverAndClient.server().shutdown(); + shutdown(serverAndClient); } private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception {
