async close now done with thread pool
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6d6c760c Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6d6c760c Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6d6c760c Branch: refs/heads/feature/GEODE-332 Commit: 6d6c760ccb17d4bd0d683072f1d79dc895926c27 Parents: fa01768 Author: Darrel Schneider <[email protected]> Authored: Tue Sep 15 16:47:17 2015 -0700 Committer: Darrel Schneider <[email protected]> Committed: Tue Sep 15 16:47:17 2015 -0700 ---------------------------------------------------------------------- .../gemfire/internal/SocketCreator.java | 69 +++++++++++++------- .../gemfire/internal/tcp/ConnectionTable.java | 1 + 2 files changed, 48 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6d6c760c/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java index ff4a22c..0688c3d 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java @@ -81,6 +81,11 @@ import com.gemstone.gemfire.internal.util.PasswordUtil; import com.gemstone.org.jgroups.util.ConnectionWatcher; import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.net.ssl.*; @@ -1197,6 +1202,40 @@ public class SocketCreator implements com.gemstone.org.jgroups.util.SockCreator return (String[]) v.toArray( new String[ v.size() ] ); } + /** thread pool of async close threads */ + private static ThreadPoolExecutor asyncCloseExecutor; + /** Number of seconds to wait before timing out an unused async close thread. Default is 120 (2 minutes). */ + private final static long ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME", 120).longValue(); + + private static synchronized ThreadPoolExecutor getAsyncThreadExecutor() { + ThreadPoolExecutor pool = asyncCloseExecutor; + if (pool == null) { + final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger); + ThreadFactory tf = new ThreadFactory() { + public Thread newThread(final Runnable command) { + Thread thread = new Thread(tg, command); + thread.setDaemon(true); + return thread; + } + }; + BlockingQueue synchronousQueue = new SynchronousQueue(); + pool = new ThreadPoolExecutor(1, Integer.MAX_VALUE, ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS, synchronousQueue, tf); + asyncCloseExecutor = pool; + } + return pool; + } + public static synchronized void closeAsyncThreadExecutor() { + ThreadPoolExecutor pool = asyncCloseExecutor; + if (pool != null) { + pool.shutdownNow(); + asyncCloseExecutor = null; + } + } + private static synchronized void asyncExecute(Runnable r) { + // The old code waited 50ms for the async task to complete. + // Should this code use submit on the executor and also wait 50ms? + getAsyncThreadExecutor().execute(r); + } /** * Closes the specified socket in a background thread and waits a limited * amount of time for the close to complete. In some cases we see close @@ -1206,44 +1245,30 @@ public class SocketCreator implements com.gemstone.org.jgroups.util.SockCreator * @param who who the socket is connected to * @param extra an optional Runnable with stuff to execute in the async thread */ - public static void asyncClose(final Socket sock, String who, final Runnable extra) { + public static void asyncClose(final Socket sock, final String who, final Runnable extra) { if (sock == null || sock.isClosed()) { return; } try { - ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger); - - Thread t = new Thread(tg, new Runnable() { + asyncExecute(new Runnable() { public void run() { + Thread.currentThread().setName("AsyncSocketCloser for " + who); + try { if (extra != null) { extra.run(); } inlineClose(sock); + } finally { + Thread.currentThread().setName("unused AsyncSocketCloser"); + } } - }, "AsyncSocketCloser for " + who); - t.setDaemon(true); - try { - t.start(); + }); } catch (OutOfMemoryError ignore) { // If we can't start a thread to close the socket just do it inline. // See bug 50573. inlineClose(sock); return; } - try { - // [bruce] if the network fails, this will wait the full amount of time - // on every close, so it must be kept very short. it was 750ms before, - // causing frequent hangs in net-down hydra tests - t.join(50/*ms*/); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - catch (VirtualMachineError e) { - SystemFailure.initiateFailure(e); - // NOTREACHED - throw e; - } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6d6c760c/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java index 525c687..84ea1eb 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java @@ -776,6 +776,7 @@ public class ConnectionTable { m.clear(); } } + SocketCreator.closeAsyncThreadExecutor(); } public void executeCommand(Runnable runnable) {
