Repository: spark
Updated Branches:
  refs/heads/master 242be7dae -> 7cfa4c6bc


[SPARK-11865][NETWORK] Avoid returning inactive client in 
TransportClientFactory.

There's a very narrow race here where it would be possible for the timeout 
handler
to close a channel after the client factory verified that the channel was still
active. This change makes sure the client is marked as being recently in use so
that the timeout handler does not close it until a new timeout cycle elapses.

Author: Marcelo Vanzin <[email protected]>

Closes #9853 from vanzin/SPARK-11865.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7cfa4c6b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7cfa4c6b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7cfa4c6b

Branch: refs/heads/master
Commit: 7cfa4c6bc36d97e459d4adee7b03d537d63c337e
Parents: 242be7d
Author: Marcelo Vanzin <[email protected]>
Authored: Mon Nov 23 13:51:43 2015 -0800
Committer: Marcelo Vanzin <[email protected]>
Committed: Mon Nov 23 13:51:43 2015 -0800

----------------------------------------------------------------------
 .../spark/network/client/TransportClient.java   |  9 ++++-
 .../network/client/TransportClientFactory.java  | 15 ++++++--
 .../client/TransportResponseHandler.java        |  9 +++--
 .../network/server/TransportChannelHandler.java | 36 +++++++++++++-------
 4 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7cfa4c6b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
index a0ba223..876fcd8 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -73,10 +73,12 @@ public class TransportClient implements Closeable {
   private final Channel channel;
   private final TransportResponseHandler handler;
   @Nullable private String clientId;
+  private volatile boolean timedOut;
 
   public TransportClient(Channel channel, TransportResponseHandler handler) {
     this.channel = Preconditions.checkNotNull(channel);
     this.handler = Preconditions.checkNotNull(handler);
+    this.timedOut = false;
   }
 
   public Channel getChannel() {
@@ -84,7 +86,7 @@ public class TransportClient implements Closeable {
   }
 
   public boolean isActive() {
-    return channel.isOpen() || channel.isActive();
+    return !timedOut && (channel.isOpen() || channel.isActive());
   }
 
   public SocketAddress getSocketAddress() {
@@ -263,6 +265,11 @@ public class TransportClient implements Closeable {
     }
   }
 
+  /** Mark this channel as having timed out. */
+  public void timeOut() {
+    this.timedOut = true;
+  }
+
   @Override
   public void close() {
     // close is a local operation and should finish with milliseconds; timeout 
just to be safe

http://git-wip-us.apache.org/repos/asf/spark/blob/7cfa4c6b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 42a4f66..659c471 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -136,8 +136,19 @@ public class TransportClientFactory implements Closeable {
     TransportClient cachedClient = clientPool.clients[clientIndex];
 
     if (cachedClient != null && cachedClient.isActive()) {
-      logger.trace("Returning cached connection to {}: {}", address, 
cachedClient);
-      return cachedClient;
+      // Make sure that the channel will not timeout by updating the last use 
time of the
+      // handler. Then check that the client is still alive, in case it timed 
out before
+      // this code was able to update things.
+      TransportChannelHandler handler = cachedClient.getChannel().pipeline()
+        .get(TransportChannelHandler.class);
+      synchronized (handler) {
+        handler.getResponseHandler().updateTimeOfLastRequest();
+      }
+
+      if (cachedClient.isActive()) {
+        logger.trace("Returning cached connection to {}: {}", address, 
cachedClient);
+        return cachedClient;
+      }
     }
 
     // If we reach here, we don't have an existing connection open. Let's 
create a new one.

http://git-wip-us.apache.org/repos/asf/spark/blob/7cfa4c6b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
 
b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index cc88991..be181e0 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -71,7 +71,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
   }
 
   public void addFetchRequest(StreamChunkId streamChunkId, 
ChunkReceivedCallback callback) {
-    timeOfLastRequestNs.set(System.nanoTime());
+    updateTimeOfLastRequest();
     outstandingFetches.put(streamChunkId, callback);
   }
 
@@ -80,7 +80,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
   }
 
   public void addRpcRequest(long requestId, RpcResponseCallback callback) {
-    timeOfLastRequestNs.set(System.nanoTime());
+    updateTimeOfLastRequest();
     outstandingRpcs.put(requestId, callback);
   }
 
@@ -227,4 +227,9 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     return timeOfLastRequestNs.get();
   }
 
+  /** Updates the time of the last request to the current system time. */
+  public void updateTimeOfLastRequest() {
+    timeOfLastRequestNs.set(System.nanoTime());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7cfa4c6b/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
 
b/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index f8fcd1c..29d688a 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -116,20 +116,32 @@ public class TransportChannelHandler extends 
SimpleChannelInboundHandler<Message
       // there are outstanding requests, we also do a secondary consistency 
check to ensure
       // there's no race between the idle timeout and incrementing the 
numOutstandingRequests
       // (see SPARK-7003).
-      boolean isActuallyOverdue =
-        System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > 
requestTimeoutNs;
-      if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
-        if (responseHandler.numOutstandingRequests() > 0) {
-          String address = NettyUtils.getRemoteAddress(ctx.channel());
-          logger.error("Connection to {} has been quiet for {} ms while there 
are outstanding " +
-            "requests. Assuming connection is dead; please adjust 
spark.network.timeout if this " +
-            "is wrong.", address, requestTimeoutNs / 1000 / 1000);
-          ctx.close();
-        } else if (closeIdleConnections) {
-          // While CloseIdleConnections is enable, we also close idle 
connection
-          ctx.close();
+      //
+      // To avoid a race between TransportClientFactory.createClient() and 
this code which could
+      // result in an inactive client being returned, this needs to run in a 
synchronized block.
+      synchronized (this) {
+        boolean isActuallyOverdue =
+          System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > 
requestTimeoutNs;
+        if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
+          if (responseHandler.numOutstandingRequests() > 0) {
+            String address = NettyUtils.getRemoteAddress(ctx.channel());
+            logger.error("Connection to {} has been quiet for {} ms while 
there are outstanding " +
+              "requests. Assuming connection is dead; please adjust 
spark.network.timeout if this " +
+              "is wrong.", address, requestTimeoutNs / 1000 / 1000);
+            client.timeOut();
+            ctx.close();
+          } else if (closeIdleConnections) {
+            // While CloseIdleConnections is enable, we also close idle 
connection
+            client.timeOut();
+            ctx.close();
+          }
         }
       }
     }
   }
+
+  public TransportResponseHandler getResponseHandler() {
+    return responseHandler;
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to