This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3ef1905  GEODE-8817: server hangs in cache.close() (#5987)
3ef1905 is described below

commit 3ef1905e8b88d99ac3071f94d830ae23010e3d2c
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Tue Feb 9 07:56:45 2021 -0800

    GEODE-8817: server hangs in cache.close() (#5987)
    
    * possible fix
    
    * Revert "possible fix"
    
    This reverts commit f8451da33d3568c1900bda2b7a4bb3f4fef660ba.
    
    Testing showed that SSLSocket.close() still hangs when there is a
    network failure and another thread is reading from the socket.
    
    * revert socket timeout and introduce a SocketCloser executor
    
    * SocketCloser cleanup
    
    * change cleanup to be performed after the socket is closed
    
    * bill's comments addressed
    
    * bill's new comments
    
    * removed useless method annotation
---
 .../cache/tier/sockets/CacheClientProxyTest.java   |   2 +-
 .../internal/net/SocketCloserIntegrationTest.java  |  28 +++--
 .../internal/LonerDistributionManager.java         | 137 ---------------------
 .../apache/geode/internal/cache/tier/Acceptor.java |   3 +
 .../internal/cache/tier/sockets/AcceptorImpl.java  |   9 ++
 .../cache/tier/sockets/CacheClientProxy.java       |   4 +-
 .../cache/tier/sockets/ServerConnection.java       |  20 ++-
 .../apache/geode/internal/net/SocketCloser.java    |  72 ++++++-----
 8 files changed, 92 insertions(+), 183 deletions(-)

diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
index 0158946..079c54e 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
@@ -65,7 +65,7 @@ public class CacheClientProxyTest {
     final InetAddress address = mock(InetAddress.class);
     when(socket.getInetAddress()).thenReturn(address);
     when(address.getHostAddress()).thenReturn("localhost");
-    doNothing().when(sc).asyncClose(any(), eq("localhost"), eq(null));
+    doNothing().when(sc).asyncClose(any(), eq("localhost"), 
any(Runnable.class));
 
     final ClientProxyMembershipID proxyID = 
mock(ClientProxyMembershipID.class);
     final DistributedMember member = 
cache.getDistributedSystem().getDistributedMember();
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SocketCloserIntegrationTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SocketCloserIntegrationTest.java
index b2db1b8..ee7ac5c 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SocketCloserIntegrationTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SocketCloserIntegrationTest.java
@@ -113,13 +113,17 @@ public class SocketCloserIntegrationTest {
    * Verify that requesting an asyncClose on an already closed socket is a 
noop.
    */
   @Test
-  public void testClosedSocket() throws Exception {
-    final AtomicBoolean runnableCalled = new AtomicBoolean();
+  public void testOpenSocketCloser() {
+    final AtomicBoolean beforeSocketCloseRunnableWasCalled = new 
AtomicBoolean();
+    final AtomicBoolean afterSocketCloseRunnableWasCalled = new 
AtomicBoolean();
 
-    Socket s = createClosableSocket();
-    s.close();
-    this.socketCloser.asyncClose(s, "A", () -> runnableCalled.set(true));
-    await().until(() -> !runnableCalled.get());
+    final Socket closableSocket = createClosableSocket();
+    this.socketCloser.asyncClose(closableSocket, "A",
+        () -> beforeSocketCloseRunnableWasCalled.set(true),
+        () -> afterSocketCloseRunnableWasCalled.set(true));
+    await().until(() -> beforeSocketCloseRunnableWasCalled.get());
+    await().until(() -> closableSocket.isClosed());
+    await().until(() -> afterSocketCloseRunnableWasCalled.get());
   }
 
   /**
@@ -127,12 +131,16 @@ public class SocketCloserIntegrationTest {
    */
   @Test
   public void testClosedSocketCloser() {
-    final AtomicBoolean runnableCalled = new AtomicBoolean();
+    final AtomicBoolean beforeSocketCloseRunnableWasCalled = new 
AtomicBoolean();
+    final AtomicBoolean afterSocketCloseRunnableWasCalled = new 
AtomicBoolean();
 
     final Socket closableSocket = createClosableSocket();
     this.socketCloser.close();
-    this.socketCloser.asyncClose(closableSocket, "A", () -> 
runnableCalled.set(true));
-    await()
-        .until(() -> runnableCalled.get() && closableSocket.isClosed());
+    this.socketCloser.asyncClose(closableSocket, "A",
+        () -> beforeSocketCloseRunnableWasCalled.set(true),
+        () -> afterSocketCloseRunnableWasCalled.set(true));
+    await().until(() -> beforeSocketCloseRunnableWasCalled.get());
+    await().until(() -> closableSocket.isClosed());
+    await().until(() -> afterSocketCloseRunnableWasCalled.get());
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index 532ca01..08dab41 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -23,14 +23,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.InternalGemFireError;
@@ -1023,139 +1019,6 @@ public class LonerDistributionManager implements 
DistributionManager {
       return 0;
     }
   }
-  protected static class DummyExecutor implements ExecutorService {
-    @Override
-    public void execute(Runnable command) {
-      command.run();
-    }
-
-    @Override
-    public void shutdown() {}
-
-    @Override
-    public List<Runnable> shutdownNow() {
-      return Collections.emptyList();
-    }
-
-    @Override
-    public boolean isShutdown() {
-      return false;
-    }
-
-    @Override
-    public boolean isTerminated() {
-      return false;
-    }
-
-    @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
-      return true;
-    }
-
-    @Override
-    public <T> Future<T> submit(final Callable<T> task) {
-      Exception ex = null;
-      T result = null;
-      try {
-        result = task.call();
-      } catch (Exception e) {
-        ex = e;
-      }
-      return new CompletedFuture<T>(result, ex);
-    }
-
-    @Override
-    public <T> Future<T> submit(final Runnable task, final T result) {
-      return submit(new Callable<T>() {
-        @Override
-        public T call() throws Exception {
-          task.run();
-          return result;
-        }
-      });
-    }
-
-    @Override
-    public Future<?> submit(Runnable task) {
-      return submit(task, null);
-    }
-
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks)
-        throws InterruptedException {
-      List<Future<T>> results = new ArrayList<Future<T>>();
-      for (Callable<T> task : tasks) {
-        results.add(submit(task));
-      }
-      return results;
-    }
-
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks, long timeout,
-        TimeUnit unit) throws InterruptedException {
-      return invokeAll(tasks);
-    }
-
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException, ExecutionException {
-
-      ExecutionException ex = null;
-      for (Callable<T> task : tasks) {
-        try {
-          return submit(task).get();
-        } catch (ExecutionException e) {
-          ex = e;
-        }
-      }
-      throw (ExecutionException) ex.fillInStackTrace();
-    }
-
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-      return invokeAny(tasks);
-    }
-  }
-
-  private static class CompletedFuture<T> implements Future<T> {
-    private final T result;
-    private final Exception ex;
-
-    public CompletedFuture(T result, Exception ex) {
-      this.result = result;
-      this.ex = ex;
-    }
-
-    @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-      return false;
-    }
-
-    @Override
-    public boolean isCancelled() {
-      return false;
-    }
-
-    @Override
-    public boolean isDone() {
-      return true;
-    }
-
-    @Override
-    public T get() throws InterruptedException, ExecutionException {
-      if (ex != null) {
-        throw new ExecutionException(ex);
-      }
-      return result;
-    }
-
-    @Override
-    public T get(long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-      return get();
-    }
-  }
 
   @Override
   public void throwIfDistributionStopped() {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index 2982549..b5c0a34 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -25,6 +25,7 @@ import 
org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor;
 import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
 import org.apache.geode.internal.cache.tier.sockets.ConnectionListener;
 import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.internal.net.SocketCloser;
 
 /**
  * Defines the message listener/acceptor interface which is the GemFire cache 
server. Multiple
@@ -107,4 +108,6 @@ public interface Acceptor extends CommBufferPool {
   void decClientServerConnectionCount();
 
   int getMaximumTimeBetweenPings();
+
+  SocketCloser getSocketCloser();
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 1dd4cb7..e7d71dc 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -93,6 +93,7 @@ import 
org.apache.geode.internal.cache.wan.GatewayReceiverStats;
 import org.apache.geode.internal.inet.LocalHostUtil;
 import org.apache.geode.internal.logging.CoreLoggingExecutors;
 import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.net.SocketCloser;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.internal.serialization.KnownVersion;
@@ -349,6 +350,8 @@ public class AcceptorImpl implements Acceptor, Runnable {
 
   private final ServerConnectionFactory serverConnectionFactory;
 
+  private final SocketCloser socketCloser = new SocketCloser();
+
   /**
    * Constructs an AcceptorImpl for use within a CacheServer.
    *
@@ -1707,6 +1710,7 @@ public class AcceptorImpl implements Acceptor, Runnable {
       for (ServerConnection serverConnection : snap) {
         serverConnection.cleanup();
       }
+      socketCloser.close();
     }
   }
 
@@ -1872,6 +1876,11 @@ public class AcceptorImpl implements Acceptor, Runnable {
     return maximumTimeBetweenPings;
   }
 
+  @Override
+  public SocketCloser getSocketCloser() {
+    return socketCloser;
+  }
+
   private static class ClientQueueInitializerTask implements Runnable {
     private final Socket socket;
     private final boolean isPrimaryServerToClient;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index c17b302..ec006ca 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -954,7 +954,9 @@ public class CacheClientProxy implements ClientSession {
     String remoteHostAddress = this._remoteHostAddress;
     if (this._socketClosed.compareAndSet(false, true) && remoteHostAddress != 
null) {
       // Only one thread is expected to close the socket
-      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, 
remoteHostAddress, null);
+      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, 
remoteHostAddress,
+          () -> {
+          });
       getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
       return true;
     }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 29eec6a..625da9e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1412,11 +1412,22 @@ public abstract class ServerConnection implements 
Runnable {
       getAcceptor().decClientServerConnectionCount();
     }
 
-    try {
-      theSocket.close();
-    } catch (Exception ignored) {
-    }
+    if (!theSocket.isClosed()) {
+      // Here we direct closing of sockets to one of two executors. Use of an 
executor
+      // keeps us from causing an explosion of new threads when a server is 
shut down.
+      // Background threads are used in case the close() operation on the 
socket hangs.
+      final String closerName =
+          communicationMode.isWAN() ? "WANSocketCloser" : 
"CacheServerSocketCloser";
+      acceptor.getSocketCloser().asyncClose(theSocket, closerName, () -> {
+      },
+          () -> cleanupAfterSocketClose());
+      return true;
+    }
+    cleanupAfterSocketClose();
+    return true;
+  }
 
+  protected void cleanupAfterSocketClose() {
     try {
       if (postAuthzRequest != null) {
         postAuthzRequest.close();
@@ -1437,7 +1448,6 @@ public abstract class ServerConnection implements 
Runnable {
     }
     releaseCommBuffer();
     processMessages = false;
-    return true;
   }
 
   private void releaseCommBuffer() {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
index 15a129e..3613550 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
@@ -16,6 +16,7 @@ package org.apache.geode.internal.net;
 
 import java.io.IOException;
 import java.net.Socket;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -25,6 +26,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.jetbrains.annotations.NotNull;
+
 import org.apache.geode.SystemFailure;
 import org.apache.geode.logging.internal.executors.LoggingExecutors;
 
@@ -66,7 +69,6 @@ public class SocketCloser {
    */
   private final ConcurrentHashMap<String, ExecutorService> asyncCloseExecutors 
=
       new ConcurrentHashMap<>();
-  private final long asyncClosePoolKeepAliveSeconds;
   private final int asyncClosePoolMaxThreads;
   private final long asyncCloseWaitTime;
   private final TimeUnit asyncCloseWaitUnits;
@@ -91,7 +93,6 @@ public class SocketCloser {
 
   public SocketCloser(long asyncClosePoolKeepAliveSeconds, int 
asyncClosePoolMaxThreads,
       long asyncCloseWaitTime, TimeUnit asyncCloseWaitUnits) {
-    this.asyncClosePoolKeepAliveSeconds = asyncClosePoolKeepAliveSeconds;
     this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
     this.asyncCloseWaitTime = asyncCloseWaitTime;
     this.asyncCloseWaitUnits = asyncCloseWaitUnits;
@@ -163,60 +164,73 @@ public class SocketCloser {
   }
 
   /**
-   * Closes the specified socket in a background thread. In some cases we see 
close hang (see bug
-   * 33665). Depending on how the SocketCloser is configured (see 
ASYNC_CLOSE_WAIT_MILLISECONDS)
+   * Closes the specified socket in a background thread. In some cases we see 
close hang.
+   * Depending on how the SocketCloser is configured (see 
ASYNC_CLOSE_WAIT_MILLISECONDS)
+   * this method may block for a certain amount of time. If it is called after 
the SocketCloser is
+   * closed then a normal synchronous close is done.
+   *
+   * @param socket the socket to close
+   * @param address key used to determine which executor to use. Usually the 
address of a peer or
+   *        client
+   * @param runBeforeClose a Runnable with stuff to execute before the socket 
is closed
+   */
+  public void asyncClose(final Socket socket, final String address, final 
Runnable runBeforeClose) {
+    asyncClose(socket, address, runBeforeClose, () -> {
+    });
+  }
+
+  /**
+   * Closes the specified socket in a background thread. In some cases we see 
close hang.
+   * Depending on how the SocketCloser is configured (see 
ASYNC_CLOSE_WAIT_MILLISECONDS)
    * this method may block for a certain amount of time. If it is called after 
the SocketCloser is
    * closed then a normal synchronous close is done.
    *
    * @param socket the socket to close
-   * @param address identifies who the socket is connected to
-   * @param extra an optional Runnable with stuff to execute before the socket 
is closed
+   * @param address key used to determine which executor to use. Usually the 
address of a peer or
+   *        client
+   * @param runBeforeClose a Runnable with stuff to execute before the socket 
is closed
+   * @param runAfterClose a Runnable with stuff to execute after the socket is 
closed
    */
-  public void asyncClose(final Socket socket, final String address, final 
Runnable extra) {
-    if (socket == null || socket.isClosed()) {
+  public void asyncClose(@NotNull final Socket socket, @NotNull final String 
address,
+      @NotNull final Runnable runBeforeClose,
+      @NotNull final Runnable runAfterClose) {
+    if (socket.isClosed()) {
       return;
     }
     boolean doItInline = false;
     try {
-      Future submittedTask = null;
+      Future submittedTask = CompletableFuture.completedFuture(this);
       closedLock.lock();
       try {
         if (closed) {
           // this SocketCloser has been closed so do a synchronous, inline, 
close
           doItInline = true;
         } else {
-          submittedTask = asyncExecute(address, new Runnable() {
-            @Override
-            public void run() {
-              Thread.currentThread().setName("AsyncSocketCloser for " + 
address);
-              try {
-                if (extra != null) {
-                  extra.run();
-                }
-                inlineClose(socket);
-              } finally {
-                Thread.currentThread().setName("unused AsyncSocketCloser");
-              }
+          submittedTask = asyncExecute(address, () -> {
+            Thread.currentThread().setName("AsyncSocketCloser for " + address);
+            try {
+              runBeforeClose.run();
+              inlineClose(socket);
+              runAfterClose.run();
+            } finally {
+              Thread.currentThread().setName("unused AsyncSocketCloser");
             }
           });
         }
       } finally {
         closedLock.unlock();
       }
-      if (submittedTask != null) {
+      if (!doItInline) {
         waitForFutureTaskWithTimeout(submittedTask);
+        return;
       }
     } catch (RejectedExecutionException | OutOfMemoryError ignore) {
       // If we can't start a thread to close the socket just do it inline.
       // See bug 50573.
-      doItInline = true;
-    }
-    if (doItInline) {
-      if (extra != null) {
-        extra.run();
-      }
-      inlineClose(socket);
     }
+    runBeforeClose.run();
+    inlineClose(socket);
+    runAfterClose.run();
   }
 
   private void waitForFutureTaskWithTimeout(Future submittedTask) {

Reply via email to