Repository: incubator-geode Updated Branches: refs/heads/feature/GEODE-332 [created] 6d6c760cc
p2p readers and handshakers threads are now pooled Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/fa017689 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/fa017689 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/fa017689 Branch: refs/heads/feature/GEODE-332 Commit: fa017689ef73d9003eade82c6cb27634c03be05f Parents: 4e65f0c Author: Darrel Schneider <[email protected]> Authored: Tue Sep 15 16:03:40 2015 -0700 Committer: Darrel Schneider <[email protected]> Committed: Tue Sep 15 16:03:40 2015 -0700 ---------------------------------------------------------------------- .../gemfire/internal/tcp/Connection.java | 58 +++++++++++-------- .../gemfire/internal/tcp/ConnectionTable.java | 61 +++++++++++++++++++- 2 files changed, 93 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fa017689/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java index cd1b7dc..630ecfe 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java @@ -115,6 +115,11 @@ public class Connection implements Runnable { /** the table holding this connection */ final ConnectionTable owner; + + /** Set to false once run() is terminating. Using this instead of Thread.isAlive + * as the reader thread may be a pooled thread. + */ + private volatile boolean isRunning = false; /** true if connection is a shared resource that can be used by more than one thread */ private boolean sharedResource; @@ -136,11 +141,14 @@ public class Connection implements Runnable { } private final static ThreadLocal isReaderThread = new ThreadLocal(); - // return true if this thread is a reader thread public final static void makeReaderThread() { // mark this thread as a reader thread - isReaderThread.set(Boolean.TRUE); + makeReaderThread(true); } + private final static void makeReaderThread(boolean v) { + isReaderThread.set(v); + } + // return true if this thread is a reader thread public final static boolean isReaderThread() { Object o = isReaderThread.get(); if (o == null) { @@ -523,7 +531,7 @@ public class Connection implements Runnable { Connection c = new Connection(t, s); boolean readerStarted = false; try { - c.startReader(); + c.startReader(t); readerStarted = true; } finally { if (!readerStarted) { @@ -822,11 +830,12 @@ public class Connection implements Runnable { Runnable r = new Runnable() { public void run() { boolean rShuttingDown = readerShuttingDown; + final Thread localRef = readerThread; synchronized(stateLock) { - if (readerThread != null && readerThread.isAlive() && + if (localRef != null && isRunning && !rShuttingDown && connectionState == STATE_READING || connectionState == STATE_READING_ACK) { - readerThread.interrupt(); + localRef.interrupt(); } } } @@ -951,7 +960,7 @@ public class Connection implements Runnable { * * @throws IOException if handshake fails */ - private void attemptHandshake() throws IOException { + private void attemptHandshake(ConnectionTable connTable) throws IOException { // send HANDSHAKE // send this server's port. It's expected on the other side if (useNIO()) { @@ -961,7 +970,7 @@ public class Connection implements Runnable { handshakeStream(); } - startReader(); // this reader only reads the handshake and then exits + startReader(connTable); // this reader only reads the handshake and then exits waitForHandshake(); // waiting for reply } @@ -1099,7 +1108,7 @@ public class Connection implements Runnable { if (conn != null) { // handshake try { - conn.attemptHandshake(); + conn.attemptHandshake(t); if (conn.isSocketClosed()) { // something went wrong while reading the handshake // and the socket was closed or this guy sent us a @@ -1601,14 +1610,14 @@ public class Connection implements Runnable { this.owner.owner.getLocalId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) { isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor")); } - if (!beingSick && this.readerThread != null && !isIBM && this.readerThread.isAlive() + if (!beingSick && this.readerThread != null && !isIBM && this.isRunning && this.readerThread != Thread.currentThread()) { try { this.readerThread.join(500); - if (this.readerThread.isAlive() && !this.readerShuttingDown + if (this.isRunning && !this.readerShuttingDown && owner.getDM().getRootCause() == null) { // don't wait twice if there's a system failure this.readerThread.join(1500); - if (this.readerThread.isAlive()) { + if (this.isRunning) { logger.info(LocalizedMessage.create(LocalizedStrings.Connection_TIMED_OUT_WAITING_FOR_READERTHREAD_ON_0_TO_FINISH, this)); } } @@ -1677,26 +1686,22 @@ public class Connection implements Runnable { } /** starts a reader thread */ - private void startReader() { - ThreadGroup group = - LoggingThreadGroup.createThreadGroup("P2P Reader Threads", logger); - Assert.assertTrue(this.readerThread == null); - this.readerThread = - new Thread(group, this, p2pReaderName()); - this.readerThread.setDaemon(true); - stopped = false; - this.readerThread.start(); - } + private void startReader(ConnectionTable connTable) { + Assert.assertTrue(!this.isRunning); + stopped = false; + this.isRunning = true; + connTable.executeCommand(this); + } /** in order to read non-NIO socket-based messages we need to have a thread actively trying to grab bytes out of the sockets input queue. This is that thread. */ public void run() { + this.readerThread = Thread.currentThread(); + this.readerThread.setName(p2pReaderName()); ConnectionTable.threadWantsSharedResources(); - if (this.isReceiver) { - makeReaderThread(); - } + makeReaderThread(this.isReceiver); try { if (useNIO()) { runNioReader(); @@ -1725,6 +1730,9 @@ public class Connection implements Runnable { // for the handshake. // see bug 37524 for an example of listeners hung in waitForHandshake notifyHandshakeWaiter(false); + this.isRunning = false; + this.readerThread.setName("idle p2p reader"); + this.readerThread = null; } // finally } @@ -3307,7 +3315,7 @@ public class Connection implements Runnable { protected Object stateLock = new Object(); /** for timeout processing, this is the current state of the connection */ - protected byte connectionState; + protected byte connectionState = STATE_IDLE; /*~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~*/ /** the connection is idle, but may be in use */ http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fa017689/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java index 9beb947..525c687 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java @@ -19,8 +19,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.Logger; @@ -37,6 +44,7 @@ import com.gemstone.gemfire.internal.SocketCreator; import com.gemstone.gemfire.internal.SystemTimer; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; import com.gemstone.gemfire.internal.logging.log4j.AlertAppender; import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; @@ -129,7 +137,13 @@ public class ConnectionTable { */ private volatile boolean closed = false; - + /** + * Executor used by p2p reader and p2p handshaker threads. + */ + private Executor p2pReaderThreadPool; + /** Number of seconds to wait before timing out an unused p2p reader thread. Default is 120 (2 minutes). */ + private final static long READER_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME", 120).longValue(); + /** * The most recent instance to be created * @@ -202,11 +216,40 @@ public class ConnectionTable { this.threadOrderedConnMap = new ThreadLocal(); this.threadConnMaps = new ArrayList(); this.threadConnectionMap = new ConcurrentHashMap(); + this.p2pReaderThreadPool = createThreadPoolForIO(c.getDM().getSystem().isShareSockets()); /* NOMUX: if (TCPConduit.useNIO) { inputMuxManager = new InputMuxManager(this); inputMuxManager.start(c.logger); }*/ } + + private Executor createThreadPoolForIO(boolean conserveSockets) { + Executor executor = null; + final ThreadGroup connectionRWGroup = LoggingThreadGroup.createThreadGroup("P2P Reader Threads", logger); + if (conserveSockets) { + executor = new Executor() { + @Override + public void execute(Runnable command) { + Thread th = new Thread(connectionRWGroup, command); + th.setDaemon(true); + th.start(); + } + }; + } + else { + BlockingQueue synchronousQueue = new SynchronousQueue(); + ThreadFactory tf = new ThreadFactory() { + public Thread newThread(final Runnable command) { + Thread thread = new Thread(connectionRWGroup, command); + thread.setDaemon(true); + return thread; + } + }; + executor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, READER_POOL_KEEP_ALIVE_TIME, + TimeUnit.SECONDS, synchronousQueue, tf); + } + return executor; + } /** conduit sends connected() after establishing the server socket */ // protected void connected() { @@ -715,6 +758,15 @@ public class ConnectionTable { this.threadConnMaps.clear(); } } + { + Executor localExec = this.p2pReaderThreadPool; + if (localExec != null) { + if (localExec instanceof ExecutorService) { + ((ExecutorService)localExec).shutdown(); + } + this.p2pReaderThreadPool = null; + } + } closeReceivers(false); Map m = (Map)this.threadOrderedConnMap.get(); @@ -726,6 +778,13 @@ public class ConnectionTable { } } + public void executeCommand(Runnable runnable) { + Executor local = this.p2pReaderThreadPool; + if (local != null) { + local.execute(runnable); + } + } + /** * Close all receiving threads. This is used during shutdown and is also * used by a test hook that makes us deaf to incoming messages.
