Repository: flink
Updated Branches:
  refs/heads/master.staging [created] 598e46044


[FLINK-9785][network] add remote address information to LocalTransportException 
instances

This closes #6291


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e87e203
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e87e203
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e87e203

Branch: refs/heads/master.staging
Commit: 9e87e2030d5a8232b80e6dbb93166fb9ac9ecb53
Parents: 3f0b9fe
Author: Nico Kruber <[email protected]>
Authored: Mon Jul 9 16:49:15 2018 +0200
Committer: Nico Kruber <[email protected]>
Committed: Thu Jul 12 11:46:03 2018 +0200

----------------------------------------------------------------------
 .../CreditBasedPartitionRequestClientHandler.java    |  4 +++-
 .../io/network/netty/PartitionRequestClient.java     | 15 ++++++++++-----
 .../network/netty/PartitionRequestClientFactory.java |  6 ++++--
 .../network/netty/PartitionRequestClientHandler.java |  6 +++++-
 4 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e87e203/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 47fbdb2..9aa3920 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -167,7 +167,9 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
                                tex = new RemoteTransportException("Lost 
connection to task manager '" + remoteAddr + "'. " +
                                        "This indicates that the remote task 
manager was lost.", remoteAddr, cause);
                        } else {
-                               tex = new 
LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), 
cause);
+                               final SocketAddress localAddr = 
ctx.channel().localAddress();
+                               tex = new LocalTransportException(
+                                       String.format("%s (connection to 
'%s')", cause.getMessage(), remoteAddr), localAddr, cause);
                        }
 
                        notifyAllChannelsOfErrorAndClose(tex);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e87e203/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 91dc2d5..27d341a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
@@ -114,10 +115,11 @@ public class PartitionRequestClient {
                        public void operationComplete(ChannelFuture future) 
throws Exception {
                                if (!future.isSuccess()) {
                                        
clientHandler.removeInputChannel(inputChannel);
+                                       SocketAddress remoteAddr = 
future.channel().remoteAddress();
                                        inputChannel.onError(
                                                        new 
LocalTransportException(
-                                                                       
"Sending the partition request failed.",
-                                                                       
future.channel().localAddress(), future.cause()
+                                                               
String.format("Sending the partition request to '%s' failed.", remoteAddr),
+                                                               
future.channel().localAddress(), future.cause()
                                                        ));
                                }
                        }
@@ -158,9 +160,10 @@ public class PartitionRequestClient {
                                                        @Override
                                                        public void 
operationComplete(ChannelFuture future) throws Exception {
                                                                if 
(!future.isSuccess()) {
+                                                                       
SocketAddress remoteAddr = future.channel().remoteAddress();
                                                                        
inputChannel.onError(new LocalTransportException(
-                                                                               
        "Sending the task event failed.",
-                                                                               
        future.channel().localAddress(), future.cause()
+                                                                               
String.format("Sending the task event to '%s' failed.", remoteAddr),
+                                                                               
future.channel().localAddress(), future.cause()
                                                                        ));
                                                                }
                                                        }
@@ -193,7 +196,9 @@ public class PartitionRequestClient {
 
        private void checkNotClosed() throws IOException {
                if (closeReferenceCounter.isDisposed()) {
-                       throw new LocalTransportException("Channel closed.", 
tcpChannel.localAddress());
+                       final SocketAddress localAddr = 
tcpChannel.localAddress();
+                       final SocketAddress remoteAddr = 
tcpChannel.remoteAddress();
+                       throw new 
LocalTransportException(String.format("Channel to '%s' closed.", remoteAddr), 
localAddr);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e87e203/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index ea9d3aa..2e357c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -220,8 +220,10 @@ class PartitionRequestClientFactory {
                        }
                        else {
                                notifyOfError(new LocalTransportException(
-                                               "Connecting to remote task 
manager + '" + connectionId.getAddress() +
-                                                               "' has been 
cancelled.", null));
+                                       String.format(
+                                               "Connecting to remote task 
manager '%s' has been cancelled.",
+                                               connectionId.getAddress()),
+                                       null));
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e87e203/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 2279deb..367c62d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -164,7 +164,11 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter impleme
                                                                + "that the 
remote task manager was lost.", remoteAddr, cause);
                        }
                        else {
-                               tex = new 
LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), 
cause);
+                               SocketAddress localAddr = 
ctx.channel().localAddress();
+                               tex = new LocalTransportException(
+                                       String.format("%s (connection to 
'%s')", cause.getMessage(), remoteAddr),
+                                       localAddr,
+                                       cause);
                        }
 
                        notifyAllChannelsOfErrorAndClose(tex);

Reply via email to