This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93c834be953f1336adb3ec5b5bf759a20e25eddf
Author: Weijie Guo <[email protected]>
AuthorDate: Wed Nov 16 21:24:23 2022 +0800

    [FLINK-29639] Print resourceId of remote taskmanager when encounter 
transport exception.
    
    This closes #21331
---
 .../flink/runtime/io/network/ConnectionID.java     | 22 +++++++++----
 .../runtime/io/network/NetworkClientHandler.java   |  2 ++
 .../CreditBasedPartitionRequestClientHandler.java  | 37 +++++++++++++++++++--
 .../network/netty/NettyPartitionRequestClient.java | 16 +++++++--
 .../netty/PartitionRequestClientFactory.java       |  4 +++
 .../runtime/shuffle/NettyShuffleDescriptor.java    | 38 +++++++++++++++-------
 .../ResultPartitionDeploymentDescriptorTest.java   |  5 +--
 .../runtime/deployment/ShuffleDescriptorTest.java  | 13 ++++++--
 .../netty/ClientTransportErrorHandlingTest.java    | 13 +++++---
 ...editBasedPartitionRequestClientHandlerTest.java |  4 +++
 .../netty/NettyPartitionRequestClientTest.java     |  4 ++-
 .../runtime/io/network/netty/NettyTestUtil.java    |  4 ++-
 .../netty/PartitionRequestClientFactoryTest.java   | 22 +++++++++----
 .../partition/consumer/InputChannelBuilder.java    |  3 +-
 .../util/NettyShuffleDescriptorBuilder.java        |  8 ++---
 15 files changed, 148 insertions(+), 47 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
index 6cb0fa29f20..fb60340c673 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
@@ -43,18 +44,26 @@ public class ConnectionID implements Serializable {
 
     private final int connectionIndex;
 
+    private final ResourceID resourceID;
+
     public ConnectionID(TaskManagerLocation connectionInfo, int 
connectionIndex) {
         this(
+                connectionInfo.getResourceID(),
                 new InetSocketAddress(connectionInfo.address(), 
connectionInfo.dataPort()),
                 connectionIndex);
     }
 
-    public ConnectionID(InetSocketAddress address, int connectionIndex) {
+    public ConnectionID(ResourceID resourceID, InetSocketAddress address, int 
connectionIndex) {
+        this.resourceID = checkNotNull(resourceID);
         this.address = checkNotNull(address);
         checkArgument(connectionIndex >= 0);
         this.connectionIndex = connectionIndex;
     }
 
+    public ResourceID getResourceID() {
+        return resourceID;
+    }
+
     public InetSocketAddress getAddress() {
         return address;
     }
@@ -75,15 +84,14 @@ public class ConnectionID implements Serializable {
         }
 
         final ConnectionID ra = (ConnectionID) other;
-        if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != 
connectionIndex) {
-            return false;
-        }
-
-        return true;
+        return ra.getAddress().equals(address)
+                && ra.getConnectionIndex() == connectionIndex
+                && ra.getResourceID().equals(resourceID);
     }
 
     @Override
     public String toString() {
-        return address + " [" + connectionIndex + "]";
+        return String.format(
+                "%s (%s) [%s]", address, resourceID.getStringWithMetadata(), 
connectionIndex);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
index 18ae9e6d9a9..354da6c3a32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
@@ -39,6 +39,8 @@ public interface NetworkClientHandler extends ChannelHandler {
 
     void cancelRequestFor(InputChannelID inputChannelId);
 
+    void setConnectionId(ConnectionID connectionId);
+
     /**
      * Return whether there is channel error.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 00d8e6e03f8..6480b49fef7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
@@ -43,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Channel handler to read the messages of buffer response or error response 
from the producer, to
  * write and flush the unannounced credits for the producer.
@@ -74,6 +77,8 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
      */
     private volatile ChannelHandlerContext ctx;
 
+    private ConnectionID connectionID;
+
     // ------------------------------------------------------------------------
     // Input channel/receiver registration
     // ------------------------------------------------------------------------
@@ -128,6 +133,9 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
                     new RemoteTransportException(
                             "Connection unexpectedly closed by remote task 
manager '"
                                     + remoteAddr
+                                    + " [ "
+                                    + 
connectionID.getResourceID().getStringWithMetadata()
+                                    + " ] "
                                     + "'. "
                                     + "This might indicate that the remote 
task manager was lost.",
                             remoteAddr));
@@ -157,6 +165,9 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
                         new RemoteTransportException(
                                 "Lost connection to task manager '"
                                         + remoteAddr
+                                        + " [ "
+                                        + 
connectionID.getResourceID().getStringWithMetadata()
+                                        + " ] "
                                         + "'. "
                                         + "This indicates that the remote task 
manager was lost.",
                                 remoteAddr,
@@ -166,7 +177,10 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
                 tex =
                         new LocalTransportException(
                                 String.format(
-                                        "%s (connection to '%s')", 
cause.getMessage(), remoteAddr),
+                                        "%s (connection to '%s [%s]')",
+                                        cause.getMessage(),
+                                        remoteAddr,
+                                        
connectionID.getResourceID().getStringWithMetadata()),
                                 localAddr,
                                 cause);
             }
@@ -212,6 +226,11 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
         return channelError.get() != null;
     }
 
+    @Override
+    public void setConnectionId(ConnectionID connectionId) {
+        this.connectionID = checkNotNull(connectionId);
+    }
+
     @Override
     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
         writeAndFlushNextMessageIfPossible(ctx.channel());
@@ -287,7 +306,12 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
             if (error.isFatalError()) {
                 notifyAllChannelsOfErrorAndClose(
                         new RemoteTransportException(
-                                "Fatal error at remote task manager '" + 
remoteAddr + "'.",
+                                "Fatal error at remote task manager '"
+                                        + remoteAddr
+                                        + " [ "
+                                        + 
connectionID.getResourceID().getStringWithMetadata()
+                                        + " ] "
+                                        + "'.",
                                 remoteAddr,
                                 error.cause));
             } else {
@@ -299,7 +323,14 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
                     } else {
                         inputChannel.onError(
                                 new RemoteTransportException(
-                                        "Error at remote task manager '" + 
remoteAddr + "'.",
+                                        "Error at remote task manager '"
+                                                + remoteAddr
+                                                + " [ "
+                                                + connectionID
+                                                        .getResourceID()
+                                                        
.getStringWithMetadata()
+                                                + " ] "
+                                                + "'.",
                                         remoteAddr,
                                         error.cause));
                     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
index 52e1b68d39d..de2fc747ecb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
@@ -77,6 +77,7 @@ public class NettyPartitionRequestClient implements 
PartitionRequestClient {
         this.clientHandler = checkNotNull(clientHandler);
         this.connectionId = checkNotNull(connectionId);
         this.clientFactory = checkNotNull(clientFactory);
+        clientHandler.setConnectionId(connectionId);
     }
 
     boolean canBeDisposed() {
@@ -138,8 +139,11 @@ public class NettyPartitionRequestClient implements 
PartitionRequestClient {
                             inputChannel.onError(
                                     new LocalTransportException(
                                             String.format(
-                                                    "Sending the partition 
request to '%s (#%d)' failed.",
+                                                    "Sending the partition 
request to '%s [%s] (#%d)' failed.",
                                                     connectionId.getAddress(),
+                                                    connectionId
+                                                            .getResourceID()
+                                                            
.getStringWithMetadata(),
                                                     
connectionId.getConnectionIndex()),
                                             future.channel().localAddress(),
                                             future.cause()));
@@ -197,8 +201,11 @@ public class NettyPartitionRequestClient implements 
PartitionRequestClient {
                                     inputChannel.onError(
                                             new LocalTransportException(
                                                     String.format(
-                                                            "Sending the task 
event to '%s (#%d)' failed.",
+                                                            "Sending the task 
event to '%s [%s] (#%d)' failed.",
                                                             
connectionId.getAddress(),
+                                                            connectionId
+                                                                    
.getResourceID()
+                                                                    
.getStringWithMetadata(),
                                                             
connectionId.getConnectionIndex()),
                                                     
future.channel().localAddress(),
                                                     future.cause()));
@@ -275,7 +282,10 @@ public class NettyPartitionRequestClient implements 
PartitionRequestClient {
             final SocketAddress localAddr = tcpChannel.localAddress();
             final SocketAddress remoteAddr = tcpChannel.remoteAddress();
             throw new LocalTransportException(
-                    String.format("Channel to '%s' closed.", remoteAddr), 
localAddr);
+                    String.format(
+                            "Channel to '%s [%s]' closed.",
+                            remoteAddr, 
connectionId.getResourceID().getStringWithMetadata()),
+                    localAddr);
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index 7463cad9e70..ca92f052ffe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -80,6 +80,7 @@ class PartitionRequestClientFactory {
         // We map the input ConnectionID to a new value to restrict the number 
of tcp connections
         connectionId =
                 new ConnectionID(
+                        connectionId.getResourceID(),
                         connectionId.getAddress(),
                         connectionId.getConnectionIndex() % 
maxNumberOfConnections);
         while (true) {
@@ -164,6 +165,9 @@ class PartitionRequestClientFactory {
             throw new RemoteTransportException(
                     "Connecting to remote task manager '"
                             + connectionId.getAddress()
+                            + " [ "
+                            + 
connectionId.getResourceID().getStringWithMetadata()
+                            + " ] "
                             + "' has failed. This might indicate that the 
remote task "
                             + "manager has been lost.",
                     connectionId.getAddress(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
index 9831e949d30..e73e305497f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
@@ -48,7 +48,10 @@ public class NettyShuffleDescriptor implements 
ShuffleDescriptor {
     }
 
     public ConnectionID getConnectionId() {
-        return partitionConnectionInfo.getConnectionId();
+        return new ConnectionID(
+                producerLocation,
+                partitionConnectionInfo.getAddress(),
+                partitionConnectionInfo.getConnectionIndex());
     }
 
     @Override
@@ -66,9 +69,10 @@ public class NettyShuffleDescriptor implements 
ShuffleDescriptor {
     }
 
     /** Information for connection to partition producer for shuffle exchange. 
*/
-    @FunctionalInterface
     public interface PartitionConnectionInfo extends Serializable {
-        ConnectionID getConnectionId();
+        InetSocketAddress getAddress();
+
+        int getConnectionIndex();
     }
 
     /**
@@ -81,16 +85,22 @@ public class NettyShuffleDescriptor implements 
ShuffleDescriptor {
 
         private static final long serialVersionUID = 5992534320110743746L;
 
-        private final ConnectionID connectionID;
+        private final InetSocketAddress address;
+
+        private final int connectionIndex;
 
         @VisibleForTesting
-        public NetworkPartitionConnectionInfo(ConnectionID connectionID) {
-            this.connectionID = connectionID;
+        public NetworkPartitionConnectionInfo(InetSocketAddress address, int 
connectionIndex) {
+            this.address = address;
+            this.connectionIndex = connectionIndex;
         }
 
-        @Override
-        public ConnectionID getConnectionId() {
-            return connectionID;
+        public InetSocketAddress getAddress() {
+            return address;
+        }
+
+        public int getConnectionIndex() {
+            return connectionIndex;
         }
 
         static NetworkPartitionConnectionInfo fromProducerDescriptor(
@@ -98,7 +108,7 @@ public class NettyShuffleDescriptor implements 
ShuffleDescriptor {
             InetSocketAddress address =
                     new InetSocketAddress(
                             producerDescriptor.getAddress(), 
producerDescriptor.getDataPort());
-            return new NetworkPartitionConnectionInfo(new 
ConnectionID(address, connectionIndex));
+            return new NetworkPartitionConnectionInfo(address, 
connectionIndex);
         }
     }
 
@@ -111,7 +121,13 @@ public class NettyShuffleDescriptor implements 
ShuffleDescriptor {
         INSTANCE;
 
         @Override
-        public ConnectionID getConnectionId() {
+        public InetSocketAddress getAddress() {
+            throw new UnsupportedOperationException(
+                    "Local execution does not support shuffle connection.");
+        }
+
+        @Override
+        public int getConnectionIndex() {
             throw new UnsupportedOperationException(
                     "Local execution does not support shuffle connection.");
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 1fe6b9e329e..3ea6def9e6a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -71,7 +71,8 @@ class ResultPartitionDeploymentDescriptorTest {
 
     private static final ResourceID producerLocation = new 
ResourceID("producerLocation");
     private static final InetSocketAddress address = new 
InetSocketAddress("localhost", 10000);
-    private static final ConnectionID connectionID = new ConnectionID(address, 
connectionIndex);
+    private static final ConnectionID connectionID =
+            new ConnectionID(producerLocation, address, connectionIndex);
 
     /** Tests simple de/serialization with {@link UnknownShuffleDescriptor}. */
     @Test
@@ -90,7 +91,7 @@ class ResultPartitionDeploymentDescriptorTest {
         ShuffleDescriptor shuffleDescriptor =
                 new NettyShuffleDescriptor(
                         producerLocation,
-                        new NetworkPartitionConnectionInfo(connectionID),
+                        new NetworkPartitionConnectionInfo(address, 
connectionIndex),
                         resultPartitionID);
 
         ResultPartitionDeploymentDescriptor copy =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
index bb0e96c9250..920380aad90 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
@@ -66,9 +66,10 @@ public class ShuffleDescriptorTest extends TestLogger {
                             jobID, localPartitionId, consumerResourceID);
 
             ResultPartitionID remotePartitionId = new ResultPartitionID();
+            ResourceID remoteResourceID = ResourceID.generate();
             ResultPartitionDeploymentDescriptor remotePartition =
                     createResultPartitionDeploymentDescriptor(
-                            jobID, remotePartitionId, ResourceID.generate());
+                            jobID, remotePartitionId, remoteResourceID);
 
             ResultPartitionID unknownPartitionId = new ResultPartitionID();
 
@@ -118,7 +119,15 @@ public class ShuffleDescriptorTest extends TestLogger {
                         remotePartitionId);
                 nettyShuffleDescriptor = (NettyShuffleDescriptor) 
remoteShuffleDescriptor;
                 
assertThat(nettyShuffleDescriptor.isLocalTo(consumerResourceID), is(false));
-                assertThat(nettyShuffleDescriptor.getConnectionId(), 
is(STUB_CONNECTION_ID));
+                assertThat(
+                        nettyShuffleDescriptor.getConnectionId().getAddress(),
+                        is(STUB_CONNECTION_ID.getAddress()));
+                assertThat(
+                        
nettyShuffleDescriptor.getConnectionId().getConnectionIndex(),
+                        is(STUB_CONNECTION_ID.getConnectionIndex()));
+                assertThat(
+                        
nettyShuffleDescriptor.getConnectionId().getResourceID(),
+                        is(remoteResourceID));
             } else {
                 // Unknown (lazy deployment allowed)
                 verifyShuffleDescriptor(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index 5830620d893..fae2163c97e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
@@ -45,6 +46,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -65,6 +67,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 class ClientTransportErrorHandlingTest {
+    private static final ConnectionID CONNECTION_ID =
+            new ConnectionID(ResourceID.generate(), new 
InetSocketAddress("localhost", 0), 0);
 
     /**
      * Verifies that failed client requests via {@link PartitionRequestClient} 
are correctly
@@ -113,10 +117,7 @@ class ClientTransportErrorHandlingTest {
 
         PartitionRequestClient requestClient =
                 new NettyPartitionRequestClient(
-                        ch,
-                        handler,
-                        mock(ConnectionID.class),
-                        mock(PartitionRequestClientFactory.class));
+                        ch, handler, CONNECTION_ID, 
mock(PartitionRequestClientFactory.class));
 
         // Create input channels
         RemoteInputChannel[] rich =
@@ -396,7 +397,9 @@ class ClientTransportErrorHandlingTest {
     }
 
     private NetworkClientHandler getClientHandler(Channel ch) {
-        return ch.pipeline().get(NetworkClientHandler.class);
+        NetworkClientHandler networkClientHandler = 
ch.pipeline().get(NetworkClientHandler.class);
+        networkClientHandler.setConnectionId(CONNECTION_ID);
+        return networkClientHandler;
     }
 
     private RemoteInputChannel createRemoteInputChannel() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 1ea783d3e34..6b39979fac8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
@@ -62,6 +63,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 
 import static 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueueTest.blockChannel;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
@@ -640,6 +642,8 @@ class CreditBasedPartitionRequestClientHandlerTest {
             Class<? extends TransportException> expectedClass, Exception 
cause) {
         CreditBasedPartitionRequestClientHandler handler =
                 new CreditBasedPartitionRequestClientHandler();
+        handler.setConnectionId(
+                new ConnectionID(ResourceID.generate(), new 
InetSocketAddress("localhost", 0), 0));
         EmbeddedChannel embeddedChannel =
                 new EmbeddedChannel(
                         // A test handler to trigger the exception.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
index f2b66c42490..6ede9dbdc90 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
@@ -287,7 +288,8 @@ public class NettyPartitionRequestClientTest {
         try (NetUtils.Port availablePort = NetUtils.getAvailablePort()) {
             int port = availablePort.getPort();
             ConnectionID connectionID =
-                    new ConnectionID(new InetSocketAddress("localhost", port), 
0);
+                    new ConnectionID(
+                            ResourceID.generate(), new 
InetSocketAddress("localhost", port), 0);
             NettyConfig config =
                     new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, 
new Configuration());
             NettyClient nettyClient = new NettyClient(config);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
index 0ec9b5380da..5e4b5b68711 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.util.NetUtils;
 
@@ -224,8 +225,9 @@ public class NettyTestUtil {
             return client;
         }
 
-        ConnectionID getConnectionID(int connectionIndex) {
+        ConnectionID getConnectionID(ResourceID resourceID, int 
connectionIndex) {
             return new ConnectionID(
+                    resourceID,
                     new InetSocketAddress(
                             server.getConfig().getServerAddress(),
                             server.getConfig().getServerPort()),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 009fe74915c..c676046913d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
@@ -55,6 +56,8 @@ import static org.mockito.Mockito.mock;
 /** {@link PartitionRequestClientFactory} test. */
 @ExtendWith(ParameterizedTestExtension.class)
 public class PartitionRequestClientFactoryTest extends TestLogger {
+    private static final ResourceID RESOURCE_ID = ResourceID.generate();
+
     @Parameter public boolean connectionReuseEnabled;
 
     @Parameters(name = "connectionReuseEnabled={0}")
@@ -72,10 +75,11 @@ public class PartitionRequestClientFactoryTest extends 
TestLogger {
                     new PartitionRequestClientFactory(nettyClient, 
connectionReuseEnabled);
 
             nettyClient.awaitForInterrupts = true;
-            connectAndInterrupt(factory, 
nettyServerAndClient.getConnectionID(0));
+            connectAndInterrupt(factory, 
nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
 
             nettyClient.awaitForInterrupts = false;
-            
factory.createPartitionRequestClient(nettyServerAndClient.getConnectionID(0));
+            factory.createPartitionRequestClient(
+                    nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
         } finally {
             nettyServerAndClient.client().shutdown();
             nettyServerAndClient.server().shutdown();
@@ -114,7 +118,7 @@ public class PartitionRequestClientFactoryTest extends 
TestLogger {
                             new 
UnstableNettyClient(nettyServerAndClient.client(), 1),
                             connectionReuseEnabled);
 
-            final ConnectionID connectionID = 
nettyServerAndClient.getConnectionID(0);
+            final ConnectionID connectionID = 
nettyServerAndClient.getConnectionID(RESOURCE_ID, 0);
             assertThatThrownBy(() -> 
factory.createPartitionRequestClient(connectionID))
                     .withFailMessage("Expected the first request to fail.")
                     .isInstanceOf(RemoteTransportException.class);
@@ -153,7 +157,8 @@ public class PartitionRequestClientFactoryTest extends 
TestLogger {
                         connectionReuseEnabled);
         for (int i = 0; i < Math.max(100, maxNumberOfConnections); i++) {
             final ConnectionID connectionID =
-                    nettyServerAndClient.getConnectionID((int) (Math.random() 
* Integer.MAX_VALUE));
+                    nettyServerAndClient.getConnectionID(
+                            RESOURCE_ID, (int) (Math.random() * 
Integer.MAX_VALUE));
             set.add(factory.createPartitionRequestClient(connectionID));
         }
         assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
@@ -169,7 +174,7 @@ public class PartitionRequestClientFactoryTest extends 
TestLogger {
                 new PartitionRequestClientFactory(
                         unstableNettyClient, 2, 1, connectionReuseEnabled);
 
-        
factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));
+        
factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID,
 0));
 
         serverAndClient.client().shutdown();
         serverAndClient.server().shutdown();
@@ -186,6 +191,7 @@ public class PartitionRequestClientFactoryTest extends 
TestLogger {
                 () ->
                         factory.createPartitionRequestClient(
                                 new ConnectionID(
+                                        ResourceID.generate(),
                                         new 
InetSocketAddress(InetAddress.getLocalHost(), 8080),
                                         0)));
 
@@ -193,6 +199,7 @@ public class PartitionRequestClientFactoryTest extends 
TestLogger {
                         () ->
                                 factory.createPartitionRequestClient(
                                         new ConnectionID(
+                                                ResourceID.generate(),
                                                 new InetSocketAddress(
                                                         
InetAddress.getLocalHost(), 8080),
                                                 0)))
@@ -213,7 +220,7 @@ public class PartitionRequestClientFactoryTest extends 
TestLogger {
             assertThatThrownBy(
                             () -> {
                                 factory.createPartitionRequestClient(
-                                        serverAndClient.getConnectionID(0));
+                                        
serverAndClient.getConnectionID(RESOURCE_ID, 0));
                             })
                     .isInstanceOf(IOException.class);
         } finally {
@@ -243,7 +250,8 @@ public class PartitionRequestClientFactoryTest extends 
TestLogger {
                                 try {
                                     client =
                                             
factory.createPartitionRequestClient(
-                                                    
serverAndClient.getConnectionID(0));
+                                                    
serverAndClient.getConnectionID(
+                                                            RESOURCE_ID, 0));
                                 } catch (Exception e) {
                                     fail(e.getMessage());
                                 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
index bb5d24a736b..622a1392a03 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
@@ -38,7 +39,7 @@ import static 
org.apache.flink.runtime.io.network.partition.consumer.SingleInput
 /** Builder for various {@link InputChannel} types. */
 public class InputChannelBuilder {
     public static final ConnectionID STUB_CONNECTION_ID =
-            new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
+            new ConnectionID(ResourceID.generate(), new 
InetSocketAddress("localhost", 5000), 0);
 
     private int channelIndex = 0;
     private ResultPartitionID partitionId = new ResultPartitionID();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
index ee641da9f99..9de1228d9d1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
@@ -73,10 +72,11 @@ public class NettyShuffleDescriptorBuilder {
     }
 
     public NettyShuffleDescriptor buildRemote() {
-        ConnectionID connectionID =
-                new ConnectionID(new InetSocketAddress(address, dataPort), 
connectionIndex);
         return new NettyShuffleDescriptor(
-                producerLocation, new 
NetworkPartitionConnectionInfo(connectionID), id);
+                producerLocation,
+                new NetworkPartitionConnectionInfo(
+                        new InetSocketAddress(address, dataPort), 
connectionIndex),
+                id);
     }
 
     public NettyShuffleDescriptor buildLocal() {


Reply via email to