This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new cb82c21  GEODE-4615 Deadlock shutting down client cache
cb82c21 is described below

commit cb82c21f22fc8a56958f4091fed5d90dc055190e
Author: Bruce Schuchardt <bschucha...@pivotal.io>
AuthorDate: Fri Feb 9 14:08:38 2018 -0800

    GEODE-4615 Deadlock shutting down client cache
    
    Disallow adding a new connection to the ConnectionMap when it is being
    closed.
    
    This revision replaces the connection map's connection list with a
    poisoned list under sync and then closes each connection outside
    of the connection map's lock.  This removes the lock inversion since
    the closing thread no longer holds the map's lock.  The thread adding
    a connection will either see closing==true or it will try to add
    the connection to the poisoned list.  Either results in it
    throwing a CacheClosedException.
---
 .../internal/pooling/ConnectionManagerImpl.java    | 144 +++++++++++----------
 .../client/internal/pooling/PooledConnection.java  |   4 +
 .../pooling/ConnectionManagerJUnitTest.java        |  73 +++++++++++
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   1 +
 4 files changed, 151 insertions(+), 71 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
index c75d545..58db0b8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
@@ -16,6 +16,7 @@ package org.apache.geode.cache.client.internal.pooling;
 
 import java.net.SocketException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -37,6 +38,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
 import org.apache.geode.SystemFailure;
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.GatewayConfigurationException;
 import org.apache.geode.cache.client.AllConnectionsInUseException;
 import org.apache.geode.cache.client.NoAvailableServersException;
@@ -51,7 +53,6 @@ import org.apache.geode.cache.client.internal.PoolImpl;
 import org.apache.geode.cache.client.internal.PoolImpl.PoolTask;
 import org.apache.geode.cache.client.internal.QueueConnectionImpl;
 import org.apache.geode.distributed.PoolCancelledException;
-import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.i18n.StringId;
 import org.apache.geode.internal.cache.PoolManagerImpl;
@@ -72,13 +73,9 @@ import org.apache.geode.security.GemFireSecurityException;
 public class ConnectionManagerImpl implements ConnectionManager {
   private static final Logger logger = LogService.getLogger();
 
-  static long AQUIRE_TIMEOUT = Long
-      .getLong(DistributionConfig.GEMFIRE_PREFIX + 
"ConnectionManager.AQUIRE_TIMEOUT", 10 * 1000)
-      .longValue();
   private final String poolName;
   private final PoolStats poolStats;
-  protected final long prefillRetry; // ms // make this an int
-  // private final long pingInterval; // ms // make this an int
+  protected final long prefillRetry; // ms
   private final LinkedList/* <PooledConnection> */ availableConnections =
       new LinkedList/* <PooledConnection> */();
   protected final ConnectionMap allConnectionsMap = new ConnectionMap();
@@ -1027,9 +1024,14 @@ public class ConnectionManagerImpl implements 
ConnectionManager {
 
   protected class ConnectionMap {
     private final HashMap/* <Endpoint, HashSet<PooledConnection> */ map = new 
HashMap();
-    private final LinkedList/* <PooledConnection> */ allConnections =
-        new LinkedList/* <PooledConnection> */(); // in the order they were 
created
+    private List/* <PooledConnection> */ allConnections = new LinkedList/* 
<PooledConnection> */(); // in
+                                                                               
                     // the
+                                                                               
                     // order
+                                                                               
                     // they
+                                                                               
                     // were
+                                                                               
                     // created
     private boolean haveLifetimeExpireConnectionsTask;
+    volatile boolean closing;
 
     public synchronized boolean isIdleExpirePossible() {
       return this.allConnections.size() > minConnections;
@@ -1059,14 +1061,17 @@ public class ConnectionManagerImpl implements 
ConnectionManager {
     }
 
     public synchronized void addConnection(PooledConnection connection) {
-      addToEndpointMap(connection);
+      if (this.closing) {
+        throw new CacheClosedException("This pool is closing");
+      }
 
-      // we want the smallest birthDate (e.g. oldest cnx) at the front of the 
list
       getPoolStats().incPoolConnections(1);
-      // logger.info("DEBUG: addConnection incPoolConnections(1)->" +
-      // getPoolStats().getPoolConnections() + " con="+connection,
-      // new RuntimeException("STACK"));
-      this.allConnections.addLast(connection);
+
+      // we want the smallest birthDate (e.g. oldest cnx) at the front of the 
list
+      this.allConnections.add(connection);
+
+      addToEndpointMap(connection);
+
       if (isIdleExpirePossible()) {
         startBackgroundExpiration();
       }
@@ -1082,11 +1087,14 @@ public class ConnectionManagerImpl implements 
ConnectionManager {
     }
 
     public synchronized void addReplacedCnx(PooledConnection con, Endpoint 
oldEndpoint) {
+      if (this.closing) {
+        throw new CacheClosedException("This pool is closing");
+      }
       if (this.allConnections.remove(con)) {
         // otherwise someone else has removed it and closed it
         removeFromEndpointMap(oldEndpoint, con);
         addToEndpointMap(con);
-        this.allConnections.addLast(con);
+        this.allConnections.add(con);
         if (isIdleExpirePossible()) {
           startBackgroundExpiration();
         }
@@ -1155,11 +1163,21 @@ public class ConnectionManagerImpl implements 
ConnectionManager {
       }
     }
 
-    public synchronized void close(boolean keepAlive) {
-      map.clear();
+    public void close(boolean keepAlive) {
+      List<PooledConnection> connections;
       int count = 0;
-      while (!this.allConnections.isEmpty()) {
-        PooledConnection pc = (PooledConnection) 
this.allConnections.removeFirst();
+
+      synchronized (this) {
+        if (closing) {
+          return;
+        }
+        closing = true;
+        map.clear();
+        connections = allConnections;
+        allConnections = new ClosedPoolConnectionList();
+      }
+
+      for (PooledConnection pc : connections) {
         count++;
         if (!pc.isDestroyed()) {
           try {
@@ -1183,9 +1201,10 @@ public class ConnectionManagerImpl implements 
ConnectionManager {
     }
 
     public synchronized void emergencyClose() {
+      closing = true;
       map.clear();
       while (!this.allConnections.isEmpty()) {
-        PooledConnection pc = (PooledConnection) 
this.allConnections.removeFirst();
+        PooledConnection pc = (PooledConnection) this.allConnections.remove(0);
         pc.emergencyClose();
       }
     }
@@ -1275,56 +1294,6 @@ public class ConnectionManagerImpl implements 
ConnectionManager {
     }
 
     /**
-     * See if any of the expired connections (that have not idle expired) are 
already connected to
-     * this sl and have not idle expired. If so then just update them in-place 
to simulate a
-     * replace.
-     *
-     * @param sl the location of the server we should see if we are connected 
to
-     * @return true if we were able to extend an existing connection's 
lifetime or if we have no
-     *         connection's whose lifetime has expired. false if we need to 
create a replacement
-     *         connection.
-     */
-    public synchronized boolean tryToExtendLifeTime(ServerLocation sl) {
-      // a better approach might be to get the most loaded server
-      // (if they are not balanced) and then scan through and extend the 
lifetime
-      // of everyone not connected to that server and do a replace on just one
-      // of the guys who has lifetime expired to the most loaded server
-      boolean result = true;
-      if (!this.allConnections.isEmpty()) {
-        final long now = System.nanoTime();
-        for (Iterator it = this.allConnections.iterator(); it.hasNext();) {
-          PooledConnection pc = (PooledConnection) it.next();
-          if (pc.remainingLife(now, lifetimeTimeoutNanos) > 0) {
-            // no more connections whose lifetime could have expired
-            break;
-            // note don't ignore idle guys because they are still connected
-            // } else if (pc.remainingIdle(now, idleTimeoutNanos) <= 0) {
-            // // this con has already idle expired so ignore it
-          } else if (pc.shouldDestroy()) {
-            // this con has already been destroyed so ignore it
-          } else if (sl.equals(pc.getEndpoint().getLocation())) {
-            // we found a guy to whose lifetime we can extend
-            it.remove();
-            // logger.fine("DEBUG: tryToExtendLifeTime extending life of: " + 
pc);
-            pc.setBirthDate(now);
-            getPoolStats().incLoadConditioningExtensions();
-            this.allConnections.addLast(pc);
-            return true;
-          } else {
-            // the current pc is a candidate for reconnection to another server
-            // so set result to false which will stick unless we find another 
con
-            // whose life can be extended.
-            result = false;
-          }
-        }
-      }
-      // if (result) {
-      // logger.fine("DEBUG: tryToExtendLifeTime found no one to extend");
-      // }
-      return result;
-    }
-
-    /**
      * Extend the life of the first expired connection to sl.
      */
     public synchronized void extendLifeOfCnxToServer(ServerLocation sl) {
@@ -1346,7 +1315,7 @@ public class ConnectionManagerImpl implements 
ConnectionManager {
             // logger.fine("DEBUG: tryToExtendLifeTime extending life of: " + 
pc);
             pc.setBirthDate(now);
             getPoolStats().incLoadConditioningExtensions();
-            this.allConnections.addLast(pc);
+            this.allConnections.add(pc);
             // break so we only do this to the oldest guy
             break;
           }
@@ -1530,6 +1499,7 @@ public class ConnectionManagerImpl implements 
ConnectionManager {
       // this should not be needed but seems to currently help.
       startBackgroundLifetimeExpiration(lifetimeTimeoutNanos);
     }
+
   }
 
   private void logInfo(StringId message, Throwable t) {
@@ -1559,4 +1529,36 @@ public class ConnectionManagerImpl implements 
ConnectionManager {
     assert conn instanceof PooledConnection;
     ((PooledConnection) conn).passivate(accessed);
   }
+
+  private static class ClosedPoolConnectionList extends ArrayList {
+    @Override
+    public Object set(int index, Object element) {
+      throw new CacheClosedException("This pool has been closed");
+    }
+
+    @Override
+    public boolean add(Object element) {
+      throw new CacheClosedException("This pool has been closed");
+    }
+
+    @Override
+    public void add(int index, Object element) {
+      throw new CacheClosedException("This pool has been closed");
+    }
+
+    @Override
+    public Object remove(int index) {
+      throw new CacheClosedException("This pool has been closed");
+    }
+
+    @Override
+    public boolean addAll(Collection c) {
+      throw new CacheClosedException("This pool has been closed");
+    }
+
+    @Override
+    public boolean addAll(int index, Collection c) {
+      throw new CacheClosedException("This pool has been closed");
+    }
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
index 11413d1..bb167a6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
@@ -348,6 +348,10 @@ class PooledConnection implements Connection {
     getConnection().setWanSiteVersion(wanSiteVersion);
   }
 
+  public void setConnection(Connection newConnection) {
+    this.connection = newConnection;
+  }
+
   public void setConnectionID(long id) {
     this.connection.setConnectionID(id);
   }
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
index 60e51e1..3a07b88 100644
--- 
a/geode-core/src/test/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
@@ -14,8 +14,10 @@
  */
 package org.apache.geode.cache.client.internal.pooling;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.internal.Assert.assertTrue;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.fail;
 
@@ -28,9 +30,11 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -38,11 +42,13 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.client.AllConnectionsInUseException;
 import org.apache.geode.cache.client.NoAvailableServersException;
 import org.apache.geode.cache.client.internal.ClientUpdater;
 import org.apache.geode.cache.client.internal.Connection;
 import org.apache.geode.cache.client.internal.ConnectionFactory;
+import org.apache.geode.cache.client.internal.ConnectionImpl;
 import org.apache.geode.cache.client.internal.ConnectionStats;
 import org.apache.geode.cache.client.internal.Endpoint;
 import org.apache.geode.cache.client.internal.EndpointManager;
@@ -52,6 +58,7 @@ import org.apache.geode.cache.client.internal.QueueManager;
 import org.apache.geode.cache.client.internal.ServerBlackList;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ServerLocation;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.PoolStats;
@@ -586,6 +593,72 @@ public class ConnectionManagerJUnitTest {
     manager.returnConnection(conn4);
   }
 
+  /**
+   * This tests that a deadlock between connection formation and connection 
pool closing has been
+   * fixed. See GEODE-4615
+   */
+  @Test
+  public void testThatMapCloseCausesCacheClosedException() throws Exception {
+    final ConnectionManagerImpl connectionManager = new 
ConnectionManagerImpl("pool", factory,
+        endpointManager, 2, 0, -1, -1, logger, 60 * 1000, cancelCriterion, 
poolStats);
+    manager = connectionManager;
+    connectionManager.start(background);
+    final ConnectionManagerImpl.ConnectionMap connectionMap = 
connectionManager.allConnectionsMap;
+
+    final int thread1 = 0;
+    final int thread2 = 1;
+    final boolean[] ready = new boolean[2];
+    Thread thread = new Thread("ConnectionManagerJUnitTest thread") {
+      public void run() {
+        setReady(ready, thread1);
+        waitUntilReady(ready, thread2);
+        connectionMap.close(false);
+      }
+    };
+    thread.setDaemon(true);
+    thread.start();
+    try {
+      Connection firstConnection = connectionManager.borrowConnection(0);
+      synchronized (firstConnection) {
+        setReady(ready, thread2);
+        waitUntilReady(ready, thread1);
+        // the other thread will now try to close the connection map but it 
will block
+        // because this thread has locked one of the connections
+        Awaitility.await().atMost(5, SECONDS).until(() -> 
connectionMap.closing);
+        try {
+          connectionManager.borrowConnection(0);
+          fail("expected a CacheClosedException");
+        } catch (CacheClosedException e) {
+          // expected
+        }
+      }
+    } finally {
+      if (thread.isAlive()) {
+        System.out.println("stopping background thread");
+        thread.interrupt();
+        thread.join();
+      }
+    }
+  }
+
+  private void setReady(boolean[] ready, int index) {
+    System.out.println(
+        Thread.currentThread().getName() + ": setting that thread" + (index + 
1) + " is ready");
+    synchronized (ready) {
+      ready[index] = true;
+    }
+  }
+
+  private void waitUntilReady(boolean[] ready, int index) {
+    System.out.println(
+        Thread.currentThread().getName() + ": waiting for thread" + (index + 
1) + " to be ready");
+    Awaitility.await().atMost(20, SECONDS).until(() -> {
+      synchronized (ready) {
+        return (ready[index]);
+      }
+    });
+  }
+
   @Test
   public void testBlocking() throws Throwable {
     manager = new ConnectionManagerImpl("pool", factory, endpointManager, 1, 
0, -1, -1, logger,
diff --git 
a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
 
b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index 32a787b..361e016 100644
--- 
a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ 
b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -88,3 +88,4 @@ 
org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$2
 org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$3
 org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$4
 org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$5
+org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl$ClosedPoolConnectionList

-- 
To stop receiving notification emails like this one, please contact
bschucha...@apache.org.

Reply via email to