Repository: hadoop
Updated Branches:
  refs/heads/trunk 6329bd00f -> fb6b38d67


HADOOP-11772. RPC Invoker relies on static ClientCache which has 
synchronized(this) blocks. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fb6b38d6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fb6b38d6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fb6b38d6

Branch: refs/heads/trunk
Commit: fb6b38d67d8b997eca498fc5010b037e3081ace7
Parents: 6329bd0
Author: Haohui Mai <whe...@apache.org>
Authored: Wed May 20 20:10:50 2015 -0700
Committer: Haohui Mai <whe...@apache.org>
Committed: Wed May 20 20:10:50 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../main/java/org/apache/hadoop/ipc/Client.java | 106 ++++++-------------
 2 files changed, 35 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb6b38d6/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index 1624ce2..416b819 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -604,6 +604,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-11970. Replace uses of ThreadLocal<Random> with JDK7
     ThreadLocalRandom.  (Sean Busbey via Colin P. McCabe)
 
+    HADOOP-11772. RPC Invoker relies on static ClientCache which has
+    synchronized(this) blocks. (wheat9)
+
   BUG FIXES
     HADOOP-11802: DomainSocketWatcher thread terminates sometimes after there
     is an I/O error during requestShortCircuitShm (cmccabe)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb6b38d6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index f28d8a2..feb811e 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -43,6 +43,7 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -56,6 +57,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.net.SocketFactory;
 import javax.security.sasl.Sasl;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -124,8 +127,8 @@ public class Client {
     retryCount.set(rc);
   }
 
-  private Hashtable<ConnectionId, Connection> connections =
-    new Hashtable<ConnectionId, Connection>();
+  private final Cache<ConnectionId, Connection> connections =
+      CacheBuilder.newBuilder().build();
 
   private Class<? extends Writable> valueClass;   // class of call values
   private AtomicBoolean running = new AtomicBoolean(true); // if client runs
@@ -1167,13 +1170,7 @@ public class Client {
         return;
       }
 
-      // release the resources
-      // first thing to do;take the connection out of the connection list
-      synchronized (connections) {
-        if (connections.get(remoteId) == this) {
-          connections.remove(remoteId);
-        }
-      }
+      connections.invalidate(remoteId);
 
       // close the streams and therefore the socket
       IOUtils.closeStream(out);
@@ -1260,14 +1257,12 @@ public class Client {
     }
     
     // wake up all connections
-    synchronized (connections) {
-      for (Connection conn : connections.values()) {
-        conn.interrupt();
-      }
+    for (Connection conn : connections.asMap().values()) {
+      conn.interrupt();
     }
     
     // wait until all connections are closed
-    while (!connections.isEmpty()) {
+    while (connections.size() > 0) {
       try {
         Thread.sleep(100);
       } catch (InterruptedException e) {
@@ -1283,56 +1278,12 @@ public class Client {
    */
   public Writable call(Writable param, InetSocketAddress address)
       throws IOException {
-    return call(RPC.RpcKind.RPC_BUILTIN, param, address);
-    
-  }
-  /** Make a call, passing <code>param</code>, to the IPC server running at
-   * <code>address</code>, returning the value.  Throws exceptions if there are
-   * network problems or if the remote code threw an exception.
-   * @deprecated Use {@link #call(RPC.RpcKind, Writable,
-   *  ConnectionId)} instead 
-   */
-  @Deprecated
-  public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress 
address)
-  throws IOException {
-      return call(rpcKind, param, address, null);
-  }
-  
-  /** Make a call, passing <code>param</code>, to the IPC server running at
-   * <code>address</code> with the <code>ticket</code> credentials, returning 
-   * the value.  
-   * Throws exceptions if there are network problems or if the remote code 
-   * threw an exception.
-   * @deprecated Use {@link #call(RPC.RpcKind, Writable, 
-   * ConnectionId)} instead 
-   */
-  @Deprecated
-  public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress 
addr, 
-      UserGroupInformation ticket) throws IOException {
-    ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
+    ConnectionId remoteId = ConnectionId.getConnectionId(address, null, null, 
0,
         conf);
-    return call(rpcKind, param, remoteId);
-  }
-  
-  /** Make a call, passing <code>param</code>, to the IPC server running at
-   * <code>address</code> which is servicing the <code>protocol</code> 
protocol, 
-   * with the <code>ticket</code> credentials and <code>rpcTimeout</code> as 
-   * timeout, returning the value.  
-   * Throws exceptions if there are network problems or if the remote code 
-   * threw an exception. 
-   * @deprecated Use {@link #call(RPC.RpcKind, Writable,
-   *  ConnectionId)} instead 
-   */
-  @Deprecated
-  public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress 
addr, 
-                       Class<?> protocol, UserGroupInformation ticket,
-                       int rpcTimeout) throws IOException {
-    ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
-        ticket, rpcTimeout, conf);
-    return call(rpcKind, param, remoteId);
+    return call(RpcKind.RPC_BUILTIN, param, remoteId);
+
   }
 
-  
   /**
    * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
    * Class, UserGroupInformation, int, Configuration)}
@@ -1506,15 +1457,14 @@ public class Client {
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   Set<ConnectionId> getConnectionIds() {
-    synchronized (connections) {
-      return connections.keySet();
-    }
+    return connections.asMap().keySet();
   }
   
   /** Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given ConnectionId are reused. */
-  private Connection getConnection(ConnectionId remoteId,
-      Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
+  private Connection getConnection(
+      final ConnectionId remoteId,
+      Call call, final int serviceClass, AtomicBoolean fallbackToSimpleAuth)
       throws IOException {
     if (!running.get()) {
       // the client is stopped
@@ -1525,15 +1475,23 @@ public class Client {
      * connectionsId object and with set() method. We need to manage the
      * refs for keys in HashMap properly. For now its ok.
      */
-    do {
-      synchronized (connections) {
-        connection = connections.get(remoteId);
-        if (connection == null) {
-          connection = new Connection(remoteId, serviceClass);
-          connections.put(remoteId, connection);
-        }
+    while(true) {
+      try {
+        connection = connections.get(remoteId, new Callable<Connection>() {
+          @Override
+          public Connection call() throws Exception {
+            return new Connection(remoteId, serviceClass);
+          }
+        });
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+      if (connection.addCall(call)) {
+        break;
+      } else {
+        connections.invalidate(remoteId);
       }
-    } while (!connection.addCall(call));
+    }
     
     //we don't invoke the method below inside "synchronized (connections)"
     //block above. The reason for that is if the server happens to be slow,

Reply via email to