GEODE-3416: Cleanup SocketCloser code to reduce the synchronization
Old code would force single threaded behavior, new code will synchronize
on the checking or changing or the closed flag


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/821e03dc
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/821e03dc
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/821e03dc

Branch: refs/heads/feature/GEODE-3416
Commit: 821e03dc6f45a6a9a4c43045dfbb235a626c2ec7
Parents: ea2fc82
Author: Udo Kohlmeyer <ukohlme...@pivotal.io>
Authored: Fri Aug 11 09:45:45 2017 -0700
Committer: Udo Kohlmeyer <ukohlme...@pivotal.io>
Committed: Fri Aug 11 09:51:34 2017 -0700

----------------------------------------------------------------------
 .../cache/tier/sockets/CacheClientProxy.java    |   4 +-
 .../apache/geode/internal/net/SocketCloser.java | 192 +++++++++++++------
 .../apache/geode/internal/tcp/Connection.java   |   4 +-
 .../internal/net/SocketCloserJUnitTest.java     |  10 +-
 4 files changed, 140 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/821e03dc/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 98bfed9..34f232d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -949,7 +949,8 @@ public class CacheClientProxy implements ClientSession {
   private void closeSocket() {
     if (this._socketClosed.compareAndSet(false, true)) {
       // Close the socket
-      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, 
null);
+      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, 
this._remoteHostAddress,
+          null);
       getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
     }
   }
@@ -963,6 +964,7 @@ public class CacheClientProxy implements ClientSession {
     {
       String remoteHostAddress = this._remoteHostAddress;
       if (remoteHostAddress != null) {
+        
this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
         this._remoteHostAddress = null;
       }
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/821e03dc/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
index fbbe797..46d69a8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
@@ -21,8 +21,10 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.net.Socket;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -49,27 +51,32 @@ public class SocketCloser {
    * minutes).
    */
   static final long ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS =
-      Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120);
+      Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120).longValue();
   /**
    * Maximum number of threads that can be doing a socket close. Any close 
requests over this max
    * will queue up waiting for a thread.
    */
-  private static final int ASYNC_CLOSE_POOL_MAX_THREADS =
-      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8);
+  static final int ASYNC_CLOSE_POOL_MAX_THREADS =
+      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 4).intValue();
   /**
    * How many milliseconds the synchronous requester waits for the async close 
to happen. Default is
    * 0. Prior releases waited 50ms.
    */
-  private static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
-      Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0);
+  static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
+      Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
 
 
+  /**
+   * map of thread pools of async close threads
+   */
+  private final ConcurrentHashMap<String, ExecutorService>
+      asyncCloseExecutors =
+      new ConcurrentHashMap<>();
   private final long asyncClosePoolKeepAliveSeconds;
   private final int asyncClosePoolMaxThreads;
   private final long asyncCloseWaitTime;
   private final TimeUnit asyncCloseWaitUnits;
-  private boolean closed;
-  private final ExecutorService socketCloserThreadPool;
+  private Boolean closed = Boolean.FALSE;
 
   public SocketCloser() {
     this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS,
@@ -82,30 +89,71 @@ public class SocketCloser {
   }
 
   public SocketCloser(long asyncClosePoolKeepAliveSeconds, int 
asyncClosePoolMaxThreads,
-      long asyncCloseWaitTime, TimeUnit asyncCloseWaitUnits) {
+                      long asyncCloseWaitTime, TimeUnit asyncCloseWaitUnits) {
     this.asyncClosePoolKeepAliveSeconds = asyncClosePoolKeepAliveSeconds;
     this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
     this.asyncCloseWaitTime = asyncCloseWaitTime;
     this.asyncCloseWaitUnits = asyncCloseWaitUnits;
-
-    final ThreadGroup threadGroup =
-        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
-    ThreadFactory threadFactory = command -> {
-      Thread thread = new Thread(threadGroup, command);
-      thread.setDaemon(true);
-      return thread;
-    };
-    socketCloserThreadPool = new 
ThreadPoolExecutor(this.asyncClosePoolMaxThreads,
-        this.asyncClosePoolMaxThreads, this.asyncClosePoolKeepAliveSeconds, 
TimeUnit.SECONDS,
-        new LinkedBlockingQueue<>(), threadFactory);
   }
 
   public int getMaxThreads() {
     return this.asyncClosePoolMaxThreads;
   }
 
-  private boolean isClosed() {
-    return this.closed;
+  private ExecutorService getAsyncThreadExecutor(String address) {
+    ExecutorService executorService = asyncCloseExecutors.get(address);
+    if (executorService == null) {
+      //To be used for pre-1.8 jdk releases.
+//      createThreadPool();
+
+      executorService = 
Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
+
+      ExecutorService
+          previousThreadPoolExecutor =
+          asyncCloseExecutors.putIfAbsent(address, executorService);
+
+      if (previousThreadPoolExecutor != null) {
+        executorService.shutdownNow();
+        return previousThreadPoolExecutor;
+      }
+    }
+    return executorService;
+  }
+
+  /**
+   * @deprecated this method is to be used for pre 1.8 jdk.
+   */
+  @Deprecated
+  private void createThreadPool() {
+    ExecutorService executorService;
+    final ThreadGroup
+        threadGroup =
+        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+    ThreadFactory threadFactory = new ThreadFactory() {
+      public Thread newThread(final Runnable command) {
+        Thread thread = new Thread(threadGroup, command);
+        thread.setDaemon(true);
+        return thread;
+      }
+    };
+
+    executorService =
+        new ThreadPoolExecutor(asyncClosePoolMaxThreads, 
asyncClosePoolMaxThreads,
+            asyncCloseWaitTime,
+            asyncCloseWaitUnits, new LinkedBlockingQueue<>(), threadFactory);
+  }
+
+  /**
+   * Call this method if you know all the resources in the closer for the 
given address are no
+   * longer needed. Currently a thread pool is kept for each address and if 
you know that an address
+   * no longer needs its pool then you should call this method.
+   */
+
+  public void releaseResourcesForAddress(String address) {
+    ExecutorService executorService = asyncCloseExecutors.remove(address);
+    if (executorService != null) {
+      executorService.shutdown();
+    }
   }
 
   /**
@@ -113,84 +161,104 @@ public class SocketCloser {
    * called then the asyncClose will be done synchronously.
    */
   public void close() {
-    if (!this.closed) {
-      this.closed = true;
-      socketCloserThreadPool.shutdown();
+    synchronized (closed) {
+      if (!this.closed) {
+        this.closed = true;
+      } else {
+        return;
+      }
+    }
+    for (ExecutorService executorService : asyncCloseExecutors.values()) {
+      executorService.shutdown();
+      asyncCloseExecutors.clear();
     }
   }
 
+  private Future asyncExecute(String address, Runnable runnableToExecute) {
+    ExecutorService asyncThreadExecutor = getAsyncThreadExecutor(address);
+    return asyncThreadExecutor.submit(runnableToExecute);
+  }
+
   /**
    * Closes the specified socket in a background thread. In some cases we see 
close hang (see bug
    * 33665). Depending on how the SocketCloser is configured (see 
ASYNC_CLOSE_WAIT_MILLISECONDS)
    * this method may block for a certain amount of time. If it is called after 
the SocketCloser is
    * closed then a normal synchronous close is done.
-   * 
-   * @param socket the socket to close
-   * @param runnableCode an optional Runnable with stuff to execute in the 
async thread
+   * @param sock the socket to close
+   * @param address identifies who the socket is connected to
+   * @param extra an optional Runnable with stuff to execute in the async 
thread
    */
-  public void asyncClose(final Socket socket, final Runnable runnableCode) {
-    if (socket == null || socket.isClosed()) {
+  public void asyncClose(final Socket sock, final String address, final 
Runnable extra) {
+    if (sock == null || sock.isClosed()) {
       return;
     }
-
     boolean doItInline = false;
     try {
-      if (isClosed()) {
-        // this SocketCloser has been closed so do a synchronous, inline, close
-        doItInline = true;
-      } else {
-        socketCloserThreadPool.execute(() -> {
-          if (runnableCode != null) {
-            runnableCode.run();
-          }
-          inlineClose(socket);
-        });
-        if (this.asyncCloseWaitTime != 0) {
-          try {
-            Future future = socketCloserThreadPool.submit(() -> {
-              if (runnableCode != null) {
-                runnableCode.run();
+      Future submittedTask = null;
+      synchronized (closed) {
+        if (closed) {
+          // this SocketCloser has been closed so do a synchronous, inline, 
close
+          doItInline = true;
+        } else {
+          submittedTask = asyncExecute(address, new Runnable() {
+            public void run() {
+              Thread.currentThread().setName("AsyncSocketCloser for " + 
address);
+              try {
+                if (extra != null) {
+                  extra.run();
+                }
+                inlineClose(sock);
+              } finally {
+                Thread.currentThread().setName("unused AsyncSocketCloser");
               }
-              inlineClose(socket);
-            });
-            future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
-          } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-            // We want this code to wait at most 50ms for the close to happen.
-            // It is ok to ignore these exception and let the close continue
-            // in the background.
-          }
+            }
+          });
         }
       }
+      if (submittedTask != null) {
+        waitForFutureTaskWithTimeout(submittedTask);
+      }
     } catch (OutOfMemoryError ignore) {
       // If we can't start a thread to close the socket just do it inline.
       // See bug 50573.
       doItInline = true;
     }
     if (doItInline) {
-      if (runnableCode != null) {
-        runnableCode.run();
+      if (extra != null) {
+        extra.run();
       }
-      inlineClose(socket);
+      inlineClose(sock);
     }
   }
 
+  private void waitForFutureTaskWithTimeout(Future submittedTask) {
+    if (this.asyncCloseWaitTime != 0) {
+      try {
+        submittedTask.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
+      } catch (InterruptedException | ExecutionException | TimeoutException e) 
{
+        // We want this code to wait at most 50ms for the close to happen.
+        // It is ok to ignore these exception and let the close continue
+        // in the background.
+      }
+    }
+  }
 
   /**
    * Closes the specified socket
-   * 
-   * @param socket the socket to close
+   * @param sock the socket to close
    */
-  private void inlineClose(final Socket socket) {
+
+  private static void inlineClose(final Socket sock) {
     // the next two statements are a mad attempt to fix bug
     // 36041 - segv in jrockit in pthread signaling code. This
     // seems to alleviate the problem.
     try {
-      socket.shutdownInput();
-      socket.shutdownOutput();
+      sock.shutdownInput();
+      sock.shutdownOutput();
     } catch (Exception e) {
     }
     try {
-      socket.close();
+      sock.close();
     } catch (IOException ignore) {
     } catch (VirtualMachineError err) {
       SystemFailure.initiateFailure(err);

http://git-wip-us.apache.org/repos/asf/geode/blob/821e03dc/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 954a33c..0ecb3bf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -562,7 +562,7 @@ public class Connection implements Runnable {
       } catch (IOException io) {
         logger.fatal(LocalizedMessage
             
.create(LocalizedStrings.Connection_UNABLE_TO_GET_P2P_CONNECTION_STREAMS), io);
-        t.getSocketCloser().asyncClose(socket, null);
+        t.getSocketCloser().asyncClose(socket, this.remoteAddr.toString(), 
null);
         throw io;
       }
     }
@@ -847,7 +847,7 @@ public class Connection implements Runnable {
         Socket s = this.socket;
         if (s != null && !s.isClosed()) {
           prepareForAsyncClose();
-          this.owner.getSocketCloser().asyncClose(s, null);
+          this.owner.getSocketCloser().asyncClose(s, 
String.valueOf(this.remoteAddr), null);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/821e03dc/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
index b6dbfe2..90315ce 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
@@ -72,11 +72,11 @@ public class SocketCloserJUnitTest {
     // They should all be stuck on countDownLatch.
     for (int i = 0; i < REMOTE_CLIENT_COUNT; i++) {
       Socket[] aSockets = new Socket[SOCKET_COUNT];
-
+      String address = i + "";
       for (int j = 0; j < SOCKET_COUNT; j++) {
         aSockets[j] = createClosableSocket();
         trackedSockets.add(aSockets[j]);
-        this.socketCloser.asyncClose(aSockets[j], () -> {
+        this.socketCloser.asyncClose(aSockets[j], address, () -> {
           try {
             waitingToClose.incrementAndGet();
             countDownLatch.await();
@@ -94,7 +94,7 @@ public class SocketCloserJUnitTest {
     // since a thread pool is doing to closes
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
       boolean areAllClosed = true;
-      for (Iterator<Socket> iterator = trackedSockets.iterator(); 
iterator.hasNext();) {
+      for (Iterator<Socket> iterator = trackedSockets.iterator(); 
iterator.hasNext(); ) {
         Socket socket = iterator.next();
         if (socket.isClosed()) {
           iterator.remove();
@@ -115,7 +115,7 @@ public class SocketCloserJUnitTest {
 
     Socket s = createClosableSocket();
     s.close();
-    this.socketCloser.asyncClose(s, () -> runnableCalled.set(true));
+    this.socketCloser.asyncClose(s, "A", () -> runnableCalled.set(true));
     Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> 
!runnableCalled.get());
   }
 
@@ -128,7 +128,7 @@ public class SocketCloserJUnitTest {
 
     final Socket closableSocket = createClosableSocket();
     this.socketCloser.close();
-    this.socketCloser.asyncClose(closableSocket, () -> 
runnableCalled.set(true));
+    this.socketCloser.asyncClose(closableSocket, "A", () -> 
runnableCalled.set(true));
     Awaitility.await().atMost(5, TimeUnit.SECONDS)
         .until(() -> runnableCalled.get() && closableSocket.isClosed());
   }

Reply via email to