This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9da2cd4  GEODE-6536: Added retry in borrowConnection/single hop (#4719)
9da2cd4 is described below

commit 9da2cd49e2e04564b446eaad579b51e986bc2179
Author: Mario Ivanac <48509724+miva...@users.noreply.github.com>
AuthorDate: Thu Feb 27 07:38:31 2020 +0100

    GEODE-6536: Added retry in borrowConnection/single hop (#4719)
    
    * GEODE-6536: Added retry in borrowConnection/single hop
    
    * GEODE-6536: bug fix
    
    * GEODE-6536: update after comments
---
 .../pooling/ConnectionManagerImplTest.java         | 32 +++++++--------
 .../pooling/ConnectionManagerJUnitTest.java        |  6 +--
 .../cache/client/internal/OpExecutorImpl.java      |  2 +-
 .../geode/cache/client/internal/PoolImpl.java      |  8 +++-
 .../client/internal/pooling/ConnectionManager.java |  5 ++-
 .../internal/pooling/ConnectionManagerImpl.java    | 48 +++++++++++++++-------
 .../client/internal/OpExecutorImplJUnitTest.java   |  2 +-
 7 files changed, 64 insertions(+), 39 deletions(-)

diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java
index 542a8fe..748f37b 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java
@@ -94,7 +94,7 @@ public class ConnectionManagerImplTest {
     ServerLocation serverLocation = mock(ServerLocation.class);
 
     connectionManager = createDefaultConnectionManager();
-    assertThatThrownBy(() -> 
connectionManager.borrowConnection(serverLocation, true))
+    assertThatThrownBy(() -> 
connectionManager.borrowConnection(serverLocation, timeout, true))
         .isInstanceOf(AllConnectionsInUseException.class);
 
     connectionManager.close(false);
@@ -110,7 +110,7 @@ public class ConnectionManagerImplTest {
     connectionManager = createDefaultConnectionManager();
     connectionManager.start(backgroundProcessor);
 
-    assertThat(connectionManager.borrowConnection(serverLocation, false))
+    assertThat(connectionManager.borrowConnection(serverLocation, timeout, 
false))
         .isInstanceOf(PooledConnection.class);
     assertThat(connectionManager.getConnectionCount()).isEqualTo(1);
 
@@ -266,9 +266,9 @@ public class ConnectionManagerImplTest {
         cancelCriterion, poolStats);
     connectionManager.start(backgroundProcessor);
 
-    connectionManager.borrowConnection(serverLocation1, false);
-    connectionManager.borrowConnection(serverLocation2, false);
-    connectionManager.borrowConnection(serverLocation3, false);
+    connectionManager.borrowConnection(serverLocation1, timeout, false);
+    connectionManager.borrowConnection(serverLocation2, timeout, false);
+    connectionManager.borrowConnection(serverLocation3, timeout, false);
 
     
assertThat(connectionManager.getConnectionCount()).isGreaterThan(maxConnections);
 
@@ -295,9 +295,9 @@ public class ConnectionManagerImplTest {
     connectionManager = createDefaultConnectionManager();
     connectionManager.start(backgroundProcessor);
     Connection heldConnection1 =
-        connectionManager.borrowConnection(serverLocation1, false);
+        connectionManager.borrowConnection(serverLocation1, timeout, false);
     Connection heldConnection2 =
-        connectionManager.borrowConnection(serverLocation2, false);
+        connectionManager.borrowConnection(serverLocation2, timeout, false);
     assertThat(connectionManager.getConnectionCount()).isEqualTo(2);
 
     connectionManager.returnConnection(heldConnection1, true);
@@ -352,11 +352,11 @@ public class ConnectionManagerImplTest {
     connectionManager.start(backgroundProcessor);
 
     Connection heldConnection1 =
-        connectionManager.borrowConnection(serverLocation1, false);
+        connectionManager.borrowConnection(serverLocation1, timeout, false);
     Connection heldConnection2 =
-        connectionManager.borrowConnection(serverLocation2, false);
+        connectionManager.borrowConnection(serverLocation2, timeout, false);
     Connection heldConnection3 =
-        connectionManager.borrowConnection(serverLocation3, false);
+        connectionManager.borrowConnection(serverLocation3, timeout, false);
 
     
assertThat(connectionManager.getConnectionCount()).isGreaterThan(maxConnections);
 
@@ -391,7 +391,7 @@ public class ConnectionManagerImplTest {
     connectionManager = createDefaultConnectionManager();
     connectionManager.start(backgroundProcessor);
 
-    Connection heldConnection = 
connectionManager.borrowConnection(serverLocation1, false);
+    Connection heldConnection = 
connectionManager.borrowConnection(serverLocation1, timeout, false);
     heldConnection = connectionManager.exchangeConnection(heldConnection, 
excluded);
 
     assertThat(heldConnection.getServer()).isEqualTo(connection2.getServer());
@@ -435,9 +435,9 @@ public class ConnectionManagerImplTest {
         cancelCriterion, poolStats);
     connectionManager.start(backgroundProcessor);
 
-    Connection heldConnection = 
connectionManager.borrowConnection(serverLocation1, false);
-    connectionManager.borrowConnection(serverLocation2, false);
-    connectionManager.borrowConnection(serverLocation3, false);
+    Connection heldConnection = 
connectionManager.borrowConnection(serverLocation1, timeout, false);
+    connectionManager.borrowConnection(serverLocation2, timeout, false);
+    connectionManager.borrowConnection(serverLocation3, timeout, false);
     
assertThat(connectionManager.getConnectionCount()).isGreaterThan(maxConnections);
 
     heldConnection = connectionManager.exchangeConnection(heldConnection, 
excluded);
@@ -470,9 +470,9 @@ public class ConnectionManagerImplTest {
     connectionManager.start(backgroundProcessor);
 
     Connection heldConnection1 =
-        connectionManager.borrowConnection(serverLocation1, false);
+        connectionManager.borrowConnection(serverLocation1, timeout, false);
     Connection heldConnection2 =
-        connectionManager.borrowConnection(serverLocation2, false);
+        connectionManager.borrowConnection(serverLocation2, timeout, false);
 
     connectionManager.returnConnection(heldConnection2);
     heldConnection2 = connectionManager.exchangeConnection(heldConnection1, 
excluded);
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
index 4f20d91..de00ce4 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
@@ -341,9 +341,9 @@ public class ConnectionManagerJUnitTest {
 
     // Ok, now get some connections that fill our queue
     Connection ping1 =
-        manager.borrowConnection(new ServerLocation("localhost", 5), false);
+        manager.borrowConnection(new ServerLocation("localhost", 5), 
BORROW_TIMEOUT_MILLIS, false);
     Connection ping2 =
-        manager.borrowConnection(new ServerLocation("localhost", 5), false);
+        manager.borrowConnection(new ServerLocation("localhost", 5), 
BORROW_TIMEOUT_MILLIS, false);
     manager.returnConnection(ping1);
     manager.returnConnection(ping2);
 
@@ -692,7 +692,7 @@ public class ConnectionManagerJUnitTest {
       // do nothing
     }
 
-    Connection conn3 = manager.borrowConnection(new 
ServerLocation("localhost", -2), false);
+    Connection conn3 = manager.borrowConnection(new 
ServerLocation("localhost", -2), 10, false);
     Assert.assertEquals(2, factory.creates);
     Assert.assertEquals(0, factory.destroys);
     Assert.assertEquals(0, factory.closes);
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
index a1e377e..869869b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
@@ -323,7 +323,7 @@ public class OpExecutorImpl implements ExecutablePool {
       }
     }
     if (conn == null) {
-      conn = connectionManager.borrowConnection(p_server, onlyUseExistingCnx);
+      conn = connectionManager.borrowConnection(p_server, serverTimeout, 
onlyUseExistingCnx);
     }
     try {
       return executeWithPossibleReAuthentication(conn, op);
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
index 4e94a8d..c83fce3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
@@ -918,10 +918,14 @@ public class PoolImpl implements InternalPool {
   }
 
   /**
-   * Test hook that acquires and returns a connection from the pool with a 
given ServerLocation.
+   * Borrows a connection to a specific server from the pool.. Used by gateway 
and tests. Any
+   * connection
+   * that is acquired using this method must be returned using 
returnConnection, even if it is
+   * destroyed.
+   *
    */
   public Connection acquireConnection(ServerLocation loc) {
-    return manager.borrowConnection(loc, false);
+    return manager.borrowConnection(loc, 15000L, false);
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java
index 7021cdd..8861821 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java
@@ -56,17 +56,18 @@ public interface ConnectionManager {
    * no connection is available.
    *
    * @param server The server the connection needs to be to.
+   * @param aquireTimeout The amount of time to wait for a connection to 
become available, if
+   *        onlyUseExistingCnx is set to true.
    * @param onlyUseExistingCnx if true, will not create a new connection if 
none are available.
    * @return A connection to use.
    * @throws AllConnectionsInUseException If there is no available connection 
on the desired server,
    *         and onlyUseExistingCnx is set.
-   * @throws ServerOperationException If there is an issue creating the 
connection due to security
    * @throws NoAvailableServersException If we can't connect to any server
    * @throws ServerConnectivityException If finding a connection and creating 
a connection both fail
    *         to return a connection
    *
    */
-  Connection borrowConnection(ServerLocation server, boolean 
onlyUseExistingCnx)
+  Connection borrowConnection(ServerLocation server, long aquireTimeout, 
boolean onlyUseExistingCnx)
       throws AllConnectionsInUseException, NoAvailableServersException;
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
index b3bffc9..84d9570 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
@@ -301,31 +301,51 @@ public class ConnectionManagerImpl implements 
ConnectionManager {
     throw new AllConnectionsInUseException();
   }
 
-  /**
-   * Borrow a connection to a specific server. This task currently allows us 
to break the connection
-   * limit, because it is used by tasks from the background thread that 
shouldn't be constrained by
-   * the limit. They will only violate the limit by 1 connection, and that 
connection will be
-   * destroyed when returned to the pool.
-   */
   @Override
-  public PooledConnection borrowConnection(ServerLocation server,
-      boolean onlyUseExistingCnx) throws AllConnectionsInUseException, 
NoAvailableServersException {
+  public PooledConnection borrowConnection(ServerLocation server, long 
acquireTimeout,
+      boolean onlyUseExistingCnx)
+      throws AllConnectionsInUseException, NoAvailableServersException,
+      ServerConnectivityException {
+
     PooledConnection connection =
         availableConnectionManager.useFirst((c) -> 
c.getServer().equals(server));
     if (null != connection) {
       return connection;
     }
 
-    if (onlyUseExistingCnx) {
-      throw new AllConnectionsInUseException();
+    if (!onlyUseExistingCnx) {
+      connection = forceCreateConnection(server);
+      if (null != connection) {
+        return connection;
+      }
+      throw new ServerConnectivityException(BORROW_CONN_ERROR_MSG + server);
     }
 
-    connection = forceCreateConnection(server);
-    if (null != connection) {
-      return connection;
+    long waitStart = NOT_WAITING;
+    try {
+      long timeout = System.nanoTime() + MILLISECONDS.toNanos(acquireTimeout);
+      while (true) {
+        connection =
+            availableConnectionManager.useFirst((c) -> 
c.getServer().equals(server));
+        if (null != connection) {
+          return connection;
+        }
+
+        if (checkShutdownInterruptedOrTimeout(timeout)) {
+          break;
+        }
+
+        waitStart = beginConnectionWaitStatIfNotStarted(waitStart);
+
+        Thread.yield();
+      }
+    } finally {
+      endConnectionWaitStatIfStarted(waitStart);
     }
 
-    throw new ServerConnectivityException(BORROW_CONN_ERROR_MSG + server);
+    cancelCriterion.checkCancelInProgress(null);
+
+    throw new AllConnectionsInUseException();
   }
 
   @Override
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
index 752ba93..068001a 100644
--- 
a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
@@ -453,7 +453,7 @@ public class OpExecutorImplJUnitTest {
     }
 
     @Override
-    public Connection borrowConnection(ServerLocation server,
+    public Connection borrowConnection(ServerLocation server, long 
aquireTimeout,
         boolean onlyUseExistingCnx) {
       borrows++;
       return new DummyConnection(server);

Reply via email to