This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.8
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.8 by this push:
     new 9771aac  ACCUMULO-4788 Make connection pool O(1) instead of O(N) (#385)
9771aac is described below

commit 9771aac269640ad25f5c7dc7f64d55e8316c1825
Author: Keith Turner <ke...@deenlo.com>
AuthorDate: Wed Feb 14 16:34:01 2018 -0500

    ACCUMULO-4788 Make connection pool O(1) instead of O(N) (#385)
    
    The Jira issue mentions two problems inefficiency and a global lock. This 
commit focuses on the inefficiency issue and makes the reserve and return 
operations O(1) instead of O(N). The global lock is stil there, but now 
operations done while the lock are held are much faster. I plan to open up a 
follow on issue to address the lock.
---
 .../core/client/impl/ThriftTransportKey.java       |   4 +
 .../core/client/impl/ThriftTransportPool.java      | 141 +++++++++++----------
 2 files changed, 79 insertions(+), 66 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
index f1ab501..dcae49c 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
@@ -86,6 +86,10 @@ public class ThriftTransportKey {
         && (!isSasl() || (ttk.isSasl() && saslParams.equals(ttk.saslParams)));
   }
 
+  public final void precomputeHashCode() {
+    hashCode();
+  }
+
   @Override
   public int hashCode() {
     if (hash == -1)
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index 3d36e69..c221607 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -41,6 +41,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 
 public class ThriftTransportPool {
   private static SecurityPermission TRANSPORT_POOL_PERMISSION = new 
SecurityPermission("transportPoolPermission");
@@ -48,7 +50,26 @@ public class ThriftTransportPool {
   private static final Random random = new Random();
   private long killTime = 1000 * 3;
 
-  private Map<ThriftTransportKey,List<CachedConnection>> cache = new 
HashMap<>();
+  private static class CachedConnections {
+    LinkedList<CachedConnection> unreserved = new LinkedList<>();
+    Map<CachedTTransport,CachedConnection> reserved = new HashMap<>();
+
+    public CachedConnection reserveAny() {
+      if (unreserved.size() > 0) {
+        CachedConnection cachedConnection = unreserved.removeFirst();
+        cachedConnection.reserve();
+        reserved.put(cachedConnection.transport, cachedConnection);
+        if (log.isTraceEnabled()) {
+          log.trace("Using existing connection to {}", 
cachedConnection.transport.cacheKey);
+        }
+        return cachedConnection;
+      }
+
+      return null;
+    }
+  }
+
+  private Map<ThriftTransportKey,CachedConnections> cache = new HashMap<>();
   private Map<ThriftTransportKey,Long> errorCount = new HashMap<>();
   private Map<ThriftTransportKey,Long> errorTime = new HashMap<>();
   private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<>();
@@ -66,15 +87,17 @@ public class ThriftTransportPool {
       this.transport = t;
     }
 
-    void setReserved(boolean reserved) {
-      this.transport.setReserved(reserved);
+    void reserve() {
+      Preconditions.checkState(!this.transport.reserved);
+      this.transport.setReserved(true);
     }
 
-    boolean isReserved() {
-      return this.transport.reserved;
+    void unreserve() {
+      Preconditions.checkState(this.transport.reserved);
+      this.transport.setReserved(false);
     }
 
-    CachedTTransport transport;
+    final CachedTTransport transport;
 
     long lastReturnTime;
   }
@@ -98,20 +121,18 @@ public class ThriftTransportPool {
         ArrayList<CachedConnection> connectionsToClose = new ArrayList<>();
 
         synchronized (pool) {
-          for (List<CachedConnection> ccl : pool.getCache().values()) {
-            Iterator<CachedConnection> iter = ccl.iterator();
+          for (CachedConnections cachedConns : pool.getCache().values()) {
+            Iterator<CachedConnection> iter = 
cachedConns.unreserved.iterator();
             while (iter.hasNext()) {
               CachedConnection cachedConnection = iter.next();
 
-              if (!cachedConnection.isReserved() && System.currentTimeMillis() 
- cachedConnection.lastReturnTime > pool.killTime) {
+              if (System.currentTimeMillis() - cachedConnection.lastReturnTime 
> pool.killTime) {
                 connectionsToClose.add(cachedConnection);
                 iter.remove();
               }
             }
-          }
 
-          for (List<CachedConnection> ccl : pool.getCache().values()) {
-            for (CachedConnection cachedConnection : ccl) {
+            for (CachedConnection cachedConnection : 
cachedConns.reserved.values()) {
               cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
             }
           }
@@ -389,21 +410,21 @@ public class ThriftTransportPool {
   }
 
   private TTransport getTransport(ThriftTransportKey cacheKey) throws 
TTransportException {
+    // compute hash code outside of lock, this lowers the time the lock is held
+    cacheKey.precomputeHashCode();
     synchronized (this) {
       // atomically reserve location if it exist in cache
-      List<CachedConnection> ccl = getCache().get(cacheKey);
+      CachedConnections ccl = getCache().get(cacheKey);
 
       if (ccl == null) {
-        ccl = new LinkedList<>();
+        ccl = new CachedConnections();
         getCache().put(cacheKey, ccl);
       }
 
-      for (CachedConnection cachedConnection : ccl) {
-        if (!cachedConnection.isReserved()) {
-          cachedConnection.setReserved(true);
-          log.trace("Using existing connection to {}", cacheKey.getServer());
-          return cachedConnection.transport;
-        }
+      CachedConnection cachedConnection = ccl.reserveAny();
+      if (cachedConnection != null) {
+        log.trace("Using existing connection to {}", cacheKey.getServer());
+        return cachedConnection.transport;
       }
     }
 
@@ -428,13 +449,11 @@ public class ThriftTransportPool {
           Collections.shuffle(cachedServers, random);
 
           for (ThriftTransportKey ttk : cachedServers) {
-            for (CachedConnection cachedConnection : getCache().get(ttk)) {
-              if (!cachedConnection.isReserved()) {
-                cachedConnection.setReserved(true);
-                final String serverAddr = ttk.getServer().toString();
-                log.trace("Using existing connection to {}", serverAddr);
-                return new Pair<String,TTransport>(serverAddr, 
cachedConnection.transport);
-              }
+            CachedConnection cachedConnection = 
getCache().get(ttk).reserveAny();
+            if (cachedConnection != null) {
+              final String serverAddr = ttk.getServer().toString();
+              log.trace("Using existing connection to {}", serverAddr);
+              return new Pair<String,TTransport>(serverAddr, 
cachedConnection.transport);
             }
           }
         }
@@ -448,15 +467,12 @@ public class ThriftTransportPool {
 
       if (preferCachedConnection) {
         synchronized (this) {
-          List<CachedConnection> cachedConnList = getCache().get(ttk);
-          if (cachedConnList != null) {
-            for (CachedConnection cachedConnection : cachedConnList) {
-              if (!cachedConnection.isReserved()) {
-                cachedConnection.setReserved(true);
-                final String serverAddr = ttk.getServer().toString();
-                log.trace("Using existing connection to {} timeout {}", 
serverAddr, ttk.getTimeout());
-                return new Pair<String,TTransport>(serverAddr, 
cachedConnection.transport);
-              }
+          CachedConnections cachedConns = getCache().get(ttk);
+          if (cachedConns != null) {
+            CachedConnection cachedConnection = cachedConns.reserveAny();
+            if (cachedConnection != null) {
+              final String serverAddr = ttk.getServer().toString();
+              return new Pair<String,TTransport>(serverAddr, 
cachedConnection.transport);
             }
           }
         }
@@ -483,18 +499,18 @@ public class ThriftTransportPool {
     CachedTTransport tsc = new CachedTTransport(transport, cacheKey);
 
     CachedConnection cc = new CachedConnection(tsc);
-    cc.setReserved(true);
+    cc.reserve();
 
     try {
       synchronized (this) {
-        List<CachedConnection> ccl = getCache().get(cacheKey);
+        CachedConnections cachedConns = getCache().get(cacheKey);
 
-        if (ccl == null) {
-          ccl = new LinkedList<>();
-          getCache().put(cacheKey, ccl);
+        if (cachedConns == null) {
+          cachedConns = new CachedConnections();
+          getCache().put(cacheKey, cachedConns);
         }
 
-        ccl.add(cc);
+        cachedConns.reserved.put(cc.transport, cc);
       }
     } catch (TransportPoolShutdownException e) {
       cc.transport.close();
@@ -514,13 +530,12 @@ public class ThriftTransportPool {
     ArrayList<CachedConnection> closeList = new ArrayList<>();
 
     synchronized (this) {
-      List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey());
-      for (Iterator<CachedConnection> iterator = ccl.iterator(); 
iterator.hasNext();) {
-        CachedConnection cachedConnection = iterator.next();
-        if (cachedConnection.transport == tsc) {
+      CachedConnections cachedConns = getCache().get(ctsc.getCacheKey());
+      if (cachedConns != null) {
+        CachedConnection cachedConnection = cachedConns.reserved.remove(ctsc);
+        if (cachedConnection != null) {
           if (ctsc.sawError) {
             closeList.add(cachedConnection);
-            iterator.remove();
 
             log.trace("Returned connection had error {}", ctsc.getCacheKey());
 
@@ -540,27 +555,23 @@ public class ThriftTransportPool {
               serversWarnedAbout.add(ctsc.getCacheKey());
             }
 
-            cachedConnection.setReserved(false);
+            cachedConnection.unreserve();
+
+            // remove all unreserved cached connection when a sever has an 
error, not just the connection that was returned
+            closeList.addAll(cachedConns.unreserved);
+            cachedConns.unreserved.clear();
 
           } else {
             log.trace("Returned connection {} ioCount: {}", 
ctsc.getCacheKey(), cachedConnection.transport.ioCount);
 
             cachedConnection.lastReturnTime = System.currentTimeMillis();
-            cachedConnection.setReserved(false);
+            cachedConnection.unreserve();
+            // Calling addFirst to use unreserved as LIFO queue. Using LIFO 
ensures that when the # of pooled connections exceeds the working set size that 
the
+            // idle times at the end of the list grow. The connections with 
large idle times will be cleaned up. Using a FIFO could continually reset the 
idle
+            // times of all connections, even when there are more than the 
working set size.
+            cachedConns.unreserved.addFirst(cachedConnection);
           }
           existInCache = true;
-          break;
-        }
-      }
-
-      // remove all unreserved cached connection when a sever has an error, 
not just the connection that was returned
-      if (ctsc.sawError) {
-        for (Iterator<CachedConnection> iterator = ccl.iterator(); 
iterator.hasNext();) {
-          CachedConnection cachedConnection = iterator.next();
-          if (!cachedConnection.isReserved()) {
-            closeList.add(cachedConnection);
-            iterator.remove();
-          }
         }
       }
     }
@@ -616,10 +627,8 @@ public class ThriftTransportPool {
         return;
 
       // close any connections in the pool... even ones that are in use
-      for (List<CachedConnection> ccl : getCache().values()) {
-        Iterator<CachedConnection> iter = ccl.iterator();
-        while (iter.hasNext()) {
-          CachedConnection cc = iter.next();
+      for (CachedConnections cachedConn : getCache().values()) {
+        for (CachedConnection cc : 
Iterables.concat(cachedConn.reserved.values(), cachedConn.unreserved)) {
           try {
             cc.transport.close();
           } catch (Exception e) {
@@ -639,7 +648,7 @@ public class ThriftTransportPool {
     }
   }
 
-  private Map<ThriftTransportKey,List<CachedConnection>> getCache() {
+  private Map<ThriftTransportKey,CachedConnections> getCache() {
     if (cache == null)
       throw new TransportPoolShutdownException();
     return cache;

-- 
To stop receiving notification emails like this one, please contact
ktur...@apache.org.

Reply via email to