This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 93c834be953f1336adb3ec5b5bf759a20e25eddf Author: Weijie Guo <[email protected]> AuthorDate: Wed Nov 16 21:24:23 2022 +0800 [FLINK-29639] Print resourceId of remote taskmanager when encounter transport exception. This closes #21331 --- .../flink/runtime/io/network/ConnectionID.java | 22 +++++++++---- .../runtime/io/network/NetworkClientHandler.java | 2 ++ .../CreditBasedPartitionRequestClientHandler.java | 37 +++++++++++++++++++-- .../network/netty/NettyPartitionRequestClient.java | 16 +++++++-- .../netty/PartitionRequestClientFactory.java | 4 +++ .../runtime/shuffle/NettyShuffleDescriptor.java | 38 +++++++++++++++------- .../ResultPartitionDeploymentDescriptorTest.java | 5 +-- .../runtime/deployment/ShuffleDescriptorTest.java | 13 ++++++-- .../netty/ClientTransportErrorHandlingTest.java | 13 +++++--- ...editBasedPartitionRequestClientHandlerTest.java | 4 +++ .../netty/NettyPartitionRequestClientTest.java | 4 ++- .../runtime/io/network/netty/NettyTestUtil.java | 4 ++- .../netty/PartitionRequestClientFactoryTest.java | 22 +++++++++---- .../partition/consumer/InputChannelBuilder.java | 3 +- .../util/NettyShuffleDescriptorBuilder.java | 8 ++--- 15 files changed, 148 insertions(+), 47 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java index 6cb0fa29f20..fb60340c673 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.executiongraph.IntermediateResult; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -43,18 +44,26 @@ public class ConnectionID implements Serializable { private final int connectionIndex; + private final ResourceID resourceID; + public ConnectionID(TaskManagerLocation connectionInfo, int connectionIndex) { this( + connectionInfo.getResourceID(), new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()), connectionIndex); } - public ConnectionID(InetSocketAddress address, int connectionIndex) { + public ConnectionID(ResourceID resourceID, InetSocketAddress address, int connectionIndex) { + this.resourceID = checkNotNull(resourceID); this.address = checkNotNull(address); checkArgument(connectionIndex >= 0); this.connectionIndex = connectionIndex; } + public ResourceID getResourceID() { + return resourceID; + } + public InetSocketAddress getAddress() { return address; } @@ -75,15 +84,14 @@ public class ConnectionID implements Serializable { } final ConnectionID ra = (ConnectionID) other; - if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != connectionIndex) { - return false; - } - - return true; + return ra.getAddress().equals(address) + && ra.getConnectionIndex() == connectionIndex + && ra.getResourceID().equals(resourceID); } @Override public String toString() { - return address + " [" + connectionIndex + "]"; + return String.format( + "%s (%s) [%s]", address, resourceID.getStringWithMetadata(), connectionIndex); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java index 18ae9e6d9a9..354da6c3a32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java @@ -39,6 +39,8 @@ public interface NetworkClientHandler extends ChannelHandler { void cancelRequestFor(InputChannelID inputChannelId); + void setConnectionId(ConnectionID connectionId); + /** * Return whether there is channel error. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java index 00d8e6e03f8..6480b49fef7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.NetworkClientHandler; import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; @@ -43,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Channel handler to read the messages of buffer response or error response from the producer, to * write and flush the unannounced credits for the producer. @@ -74,6 +77,8 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap */ private volatile ChannelHandlerContext ctx; + private ConnectionID connectionID; + // ------------------------------------------------------------------------ // Input channel/receiver registration // ------------------------------------------------------------------------ @@ -128,6 +133,9 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap new RemoteTransportException( "Connection unexpectedly closed by remote task manager '" + remoteAddr + + " [ " + + connectionID.getResourceID().getStringWithMetadata() + + " ] " + "'. " + "This might indicate that the remote task manager was lost.", remoteAddr)); @@ -157,6 +165,9 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap new RemoteTransportException( "Lost connection to task manager '" + remoteAddr + + " [ " + + connectionID.getResourceID().getStringWithMetadata() + + " ] " + "'. " + "This indicates that the remote task manager was lost.", remoteAddr, @@ -166,7 +177,10 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap tex = new LocalTransportException( String.format( - "%s (connection to '%s')", cause.getMessage(), remoteAddr), + "%s (connection to '%s [%s]')", + cause.getMessage(), + remoteAddr, + connectionID.getResourceID().getStringWithMetadata()), localAddr, cause); } @@ -212,6 +226,11 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap return channelError.get() != null; } + @Override + public void setConnectionId(ConnectionID connectionId) { + this.connectionID = checkNotNull(connectionId); + } + @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { writeAndFlushNextMessageIfPossible(ctx.channel()); @@ -287,7 +306,12 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap if (error.isFatalError()) { notifyAllChannelsOfErrorAndClose( new RemoteTransportException( - "Fatal error at remote task manager '" + remoteAddr + "'.", + "Fatal error at remote task manager '" + + remoteAddr + + " [ " + + connectionID.getResourceID().getStringWithMetadata() + + " ] " + + "'.", remoteAddr, error.cause)); } else { @@ -299,7 +323,14 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap } else { inputChannel.onError( new RemoteTransportException( - "Error at remote task manager '" + remoteAddr + "'.", + "Error at remote task manager '" + + remoteAddr + + " [ " + + connectionID + .getResourceID() + .getStringWithMetadata() + + " ] " + + "'.", remoteAddr, error.cause)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java index 52e1b68d39d..de2fc747ecb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java @@ -77,6 +77,7 @@ public class NettyPartitionRequestClient implements PartitionRequestClient { this.clientHandler = checkNotNull(clientHandler); this.connectionId = checkNotNull(connectionId); this.clientFactory = checkNotNull(clientFactory); + clientHandler.setConnectionId(connectionId); } boolean canBeDisposed() { @@ -138,8 +139,11 @@ public class NettyPartitionRequestClient implements PartitionRequestClient { inputChannel.onError( new LocalTransportException( String.format( - "Sending the partition request to '%s (#%d)' failed.", + "Sending the partition request to '%s [%s] (#%d)' failed.", connectionId.getAddress(), + connectionId + .getResourceID() + .getStringWithMetadata(), connectionId.getConnectionIndex()), future.channel().localAddress(), future.cause())); @@ -197,8 +201,11 @@ public class NettyPartitionRequestClient implements PartitionRequestClient { inputChannel.onError( new LocalTransportException( String.format( - "Sending the task event to '%s (#%d)' failed.", + "Sending the task event to '%s [%s] (#%d)' failed.", connectionId.getAddress(), + connectionId + .getResourceID() + .getStringWithMetadata(), connectionId.getConnectionIndex()), future.channel().localAddress(), future.cause())); @@ -275,7 +282,10 @@ public class NettyPartitionRequestClient implements PartitionRequestClient { final SocketAddress localAddr = tcpChannel.localAddress(); final SocketAddress remoteAddr = tcpChannel.remoteAddress(); throw new LocalTransportException( - String.format("Channel to '%s' closed.", remoteAddr), localAddr); + String.format( + "Channel to '%s [%s]' closed.", + remoteAddr, connectionId.getResourceID().getStringWithMetadata()), + localAddr); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java index 7463cad9e70..ca92f052ffe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java @@ -80,6 +80,7 @@ class PartitionRequestClientFactory { // We map the input ConnectionID to a new value to restrict the number of tcp connections connectionId = new ConnectionID( + connectionId.getResourceID(), connectionId.getAddress(), connectionId.getConnectionIndex() % maxNumberOfConnections); while (true) { @@ -164,6 +165,9 @@ class PartitionRequestClientFactory { throw new RemoteTransportException( "Connecting to remote task manager '" + connectionId.getAddress() + + " [ " + + connectionId.getResourceID().getStringWithMetadata() + + " ] " + "' has failed. This might indicate that the remote task " + "manager has been lost.", connectionId.getAddress(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java index 9831e949d30..e73e305497f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java @@ -48,7 +48,10 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor { } public ConnectionID getConnectionId() { - return partitionConnectionInfo.getConnectionId(); + return new ConnectionID( + producerLocation, + partitionConnectionInfo.getAddress(), + partitionConnectionInfo.getConnectionIndex()); } @Override @@ -66,9 +69,10 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor { } /** Information for connection to partition producer for shuffle exchange. */ - @FunctionalInterface public interface PartitionConnectionInfo extends Serializable { - ConnectionID getConnectionId(); + InetSocketAddress getAddress(); + + int getConnectionIndex(); } /** @@ -81,16 +85,22 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor { private static final long serialVersionUID = 5992534320110743746L; - private final ConnectionID connectionID; + private final InetSocketAddress address; + + private final int connectionIndex; @VisibleForTesting - public NetworkPartitionConnectionInfo(ConnectionID connectionID) { - this.connectionID = connectionID; + public NetworkPartitionConnectionInfo(InetSocketAddress address, int connectionIndex) { + this.address = address; + this.connectionIndex = connectionIndex; } - @Override - public ConnectionID getConnectionId() { - return connectionID; + public InetSocketAddress getAddress() { + return address; + } + + public int getConnectionIndex() { + return connectionIndex; } static NetworkPartitionConnectionInfo fromProducerDescriptor( @@ -98,7 +108,7 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor { InetSocketAddress address = new InetSocketAddress( producerDescriptor.getAddress(), producerDescriptor.getDataPort()); - return new NetworkPartitionConnectionInfo(new ConnectionID(address, connectionIndex)); + return new NetworkPartitionConnectionInfo(address, connectionIndex); } } @@ -111,7 +121,13 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor { INSTANCE; @Override - public ConnectionID getConnectionId() { + public InetSocketAddress getAddress() { + throw new UnsupportedOperationException( + "Local execution does not support shuffle connection."); + } + + @Override + public int getConnectionIndex() { throw new UnsupportedOperationException( "Local execution does not support shuffle connection."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 1fe6b9e329e..3ea6def9e6a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -71,7 +71,8 @@ class ResultPartitionDeploymentDescriptorTest { private static final ResourceID producerLocation = new ResourceID("producerLocation"); private static final InetSocketAddress address = new InetSocketAddress("localhost", 10000); - private static final ConnectionID connectionID = new ConnectionID(address, connectionIndex); + private static final ConnectionID connectionID = + new ConnectionID(producerLocation, address, connectionIndex); /** Tests simple de/serialization with {@link UnknownShuffleDescriptor}. */ @Test @@ -90,7 +91,7 @@ class ResultPartitionDeploymentDescriptorTest { ShuffleDescriptor shuffleDescriptor = new NettyShuffleDescriptor( producerLocation, - new NetworkPartitionConnectionInfo(connectionID), + new NetworkPartitionConnectionInfo(address, connectionIndex), resultPartitionID); ResultPartitionDeploymentDescriptor copy = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java index bb0e96c9250..920380aad90 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java @@ -66,9 +66,10 @@ public class ShuffleDescriptorTest extends TestLogger { jobID, localPartitionId, consumerResourceID); ResultPartitionID remotePartitionId = new ResultPartitionID(); + ResourceID remoteResourceID = ResourceID.generate(); ResultPartitionDeploymentDescriptor remotePartition = createResultPartitionDeploymentDescriptor( - jobID, remotePartitionId, ResourceID.generate()); + jobID, remotePartitionId, remoteResourceID); ResultPartitionID unknownPartitionId = new ResultPartitionID(); @@ -118,7 +119,15 @@ public class ShuffleDescriptorTest extends TestLogger { remotePartitionId); nettyShuffleDescriptor = (NettyShuffleDescriptor) remoteShuffleDescriptor; assertThat(nettyShuffleDescriptor.isLocalTo(consumerResourceID), is(false)); - assertThat(nettyShuffleDescriptor.getConnectionId(), is(STUB_CONNECTION_ID)); + assertThat( + nettyShuffleDescriptor.getConnectionId().getAddress(), + is(STUB_CONNECTION_ID.getAddress())); + assertThat( + nettyShuffleDescriptor.getConnectionId().getConnectionIndex(), + is(STUB_CONNECTION_ID.getConnectionIndex())); + assertThat( + nettyShuffleDescriptor.getConnectionId().getResourceID(), + is(remoteResourceID)); } else { // Unknown (lazy deployment allowed) verifyShuffleDescriptor( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java index 5830620d893..fae2163c97e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.netty; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.NetworkClientHandler; import org.apache.flink.runtime.io.network.PartitionRequestClient; @@ -45,6 +46,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -65,6 +67,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class ClientTransportErrorHandlingTest { + private static final ConnectionID CONNECTION_ID = + new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 0), 0); /** * Verifies that failed client requests via {@link PartitionRequestClient} are correctly @@ -113,10 +117,7 @@ class ClientTransportErrorHandlingTest { PartitionRequestClient requestClient = new NettyPartitionRequestClient( - ch, - handler, - mock(ConnectionID.class), - mock(PartitionRequestClientFactory.class)); + ch, handler, CONNECTION_ID, mock(PartitionRequestClientFactory.class)); // Create input channels RemoteInputChannel[] rich = @@ -396,7 +397,9 @@ class ClientTransportErrorHandlingTest { } private NetworkClientHandler getClientHandler(Channel ch) { - return ch.pipeline().get(NetworkClientHandler.class); + NetworkClientHandler networkClientHandler = ch.pipeline().get(NetworkClientHandler.class); + networkClientHandler.setConnectionId(CONNECTION_ID); + return networkClientHandler; } private RemoteInputChannel createRemoteInputChannel() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java index 1ea783d3e34..6b39979fac8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.PartitionRequestClient; import org.apache.flink.runtime.io.network.TestingConnectionManager; @@ -62,6 +63,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; +import java.net.InetSocketAddress; import static org.apache.flink.runtime.io.network.netty.PartitionRequestQueueTest.blockChannel; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel; @@ -640,6 +642,8 @@ class CreditBasedPartitionRequestClientHandlerTest { Class<? extends TransportException> expectedClass, Exception cause) { CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.setConnectionId( + new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 0), 0)); EmbeddedChannel embeddedChannel = new EmbeddedChannel( // A test handler to trigger the exception. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java index f2b66c42490..6ede9dbdc90 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.NetworkClientHandler; import org.apache.flink.runtime.io.network.PartitionRequestClient; @@ -287,7 +288,8 @@ public class NettyPartitionRequestClientTest { try (NetUtils.Port availablePort = NetUtils.getAvailablePort()) { int port = availablePort.getPort(); ConnectionID connectionID = - new ConnectionID(new InetSocketAddress("localhost", port), 0); + new ConnectionID( + ResourceID.generate(), new InetSocketAddress("localhost", port), 0); NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, new Configuration()); NettyClient nettyClient = new NettyClient(config); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java index 0ec9b5380da..5e4b5b68711 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.util.NetUtils; @@ -224,8 +225,9 @@ public class NettyTestUtil { return client; } - ConnectionID getConnectionID(int connectionIndex) { + ConnectionID getConnectionID(ResourceID resourceID, int connectionIndex) { return new ConnectionID( + resourceID, new InetSocketAddress( server.getConfig().getServerAddress(), server.getConfig().getServerPort()), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java index 009fe74915c..c676046913d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.netty; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.NetworkClientHandler; import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; @@ -55,6 +56,8 @@ import static org.mockito.Mockito.mock; /** {@link PartitionRequestClientFactory} test. */ @ExtendWith(ParameterizedTestExtension.class) public class PartitionRequestClientFactoryTest extends TestLogger { + private static final ResourceID RESOURCE_ID = ResourceID.generate(); + @Parameter public boolean connectionReuseEnabled; @Parameters(name = "connectionReuseEnabled={0}") @@ -72,10 +75,11 @@ public class PartitionRequestClientFactoryTest extends TestLogger { new PartitionRequestClientFactory(nettyClient, connectionReuseEnabled); nettyClient.awaitForInterrupts = true; - connectAndInterrupt(factory, nettyServerAndClient.getConnectionID(0)); + connectAndInterrupt(factory, nettyServerAndClient.getConnectionID(RESOURCE_ID, 0)); nettyClient.awaitForInterrupts = false; - factory.createPartitionRequestClient(nettyServerAndClient.getConnectionID(0)); + factory.createPartitionRequestClient( + nettyServerAndClient.getConnectionID(RESOURCE_ID, 0)); } finally { nettyServerAndClient.client().shutdown(); nettyServerAndClient.server().shutdown(); @@ -114,7 +118,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger { new UnstableNettyClient(nettyServerAndClient.client(), 1), connectionReuseEnabled); - final ConnectionID connectionID = nettyServerAndClient.getConnectionID(0); + final ConnectionID connectionID = nettyServerAndClient.getConnectionID(RESOURCE_ID, 0); assertThatThrownBy(() -> factory.createPartitionRequestClient(connectionID)) .withFailMessage("Expected the first request to fail.") .isInstanceOf(RemoteTransportException.class); @@ -153,7 +157,8 @@ public class PartitionRequestClientFactoryTest extends TestLogger { connectionReuseEnabled); for (int i = 0; i < Math.max(100, maxNumberOfConnections); i++) { final ConnectionID connectionID = - nettyServerAndClient.getConnectionID((int) (Math.random() * Integer.MAX_VALUE)); + nettyServerAndClient.getConnectionID( + RESOURCE_ID, (int) (Math.random() * Integer.MAX_VALUE)); set.add(factory.createPartitionRequestClient(connectionID)); } assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections); @@ -169,7 +174,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger { new PartitionRequestClientFactory( unstableNettyClient, 2, 1, connectionReuseEnabled); - factory.createPartitionRequestClient(serverAndClient.getConnectionID(0)); + factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0)); serverAndClient.client().shutdown(); serverAndClient.server().shutdown(); @@ -186,6 +191,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger { () -> factory.createPartitionRequestClient( new ConnectionID( + ResourceID.generate(), new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0))); @@ -193,6 +199,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger { () -> factory.createPartitionRequestClient( new ConnectionID( + ResourceID.generate(), new InetSocketAddress( InetAddress.getLocalHost(), 8080), 0))) @@ -213,7 +220,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger { assertThatThrownBy( () -> { factory.createPartitionRequestClient( - serverAndClient.getConnectionID(0)); + serverAndClient.getConnectionID(RESOURCE_ID, 0)); }) .isInstanceOf(IOException.class); } finally { @@ -243,7 +250,8 @@ public class PartitionRequestClientFactoryTest extends TestLogger { try { client = factory.createPartitionRequestClient( - serverAndClient.getConnectionID(0)); + serverAndClient.getConnectionID( + RESOURCE_ID, 0)); } catch (Exception e) { fail(e.getMessage()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java index bb5d24a736b..622a1392a03 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; @@ -38,7 +39,7 @@ import static org.apache.flink.runtime.io.network.partition.consumer.SingleInput /** Builder for various {@link InputChannel} types. */ public class InputChannelBuilder { public static final ConnectionID STUB_CONNECTION_ID = - new ConnectionID(new InetSocketAddress("localhost", 5000), 0); + new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 5000), 0); private int channelIndex = 0; private ResultPartitionID partitionId = new ResultPartitionID(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java index ee641da9f99..9de1228d9d1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.util; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; @@ -73,10 +72,11 @@ public class NettyShuffleDescriptorBuilder { } public NettyShuffleDescriptor buildRemote() { - ConnectionID connectionID = - new ConnectionID(new InetSocketAddress(address, dataPort), connectionIndex); return new NettyShuffleDescriptor( - producerLocation, new NetworkPartitionConnectionInfo(connectionID), id); + producerLocation, + new NetworkPartitionConnectionInfo( + new InetSocketAddress(address, dataPort), connectionIndex), + id); } public NettyShuffleDescriptor buildLocal() {
