Thread local pools never cleaned up

patch by Robert Stupp; reviewed by Jason Brown for CASSANDRA-13033


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f668c6f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f668c6f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f668c6f

Branch: refs/heads/cassandra-3.X
Commit: 7f668c6fe117f892cd79863fb9805ea5d5a2823c
Parents: da94781
Author: Robert Stupp <sn...@snazy.de>
Authored: Mon Dec 12 20:28:31 2016 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Mon Dec 12 20:38:56 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../concurrent/NamedThreadFactory.java          | 24 +++++++++++-
 .../db/commitlog/AbstractCommitLogService.java  |  3 +-
 .../db/commitlog/CommitLogSegmentManager.java   |  3 +-
 .../cassandra/net/OutboundTcpConnection.java    | 41 +++++++++-----------
 .../apache/cassandra/repair/RepairRunnable.java |  4 +-
 .../scheduler/RoundRobinScheduler.java          |  3 +-
 .../cassandra/service/StorageService.java       |  7 ++--
 .../cassandra/streaming/ConnectionHandler.java  |  4 +-
 .../compress/CompressedInputStream.java         |  3 +-
 10 files changed, 59 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bbd47c1..5bc30be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Thread local pools never cleaned up (CASSANDRA-13033)
  * Set RPC_READY to false when draining or if a node is marked as shutdown 
(CASSANDRA-12781)
  * CQL often queries static columns unnecessarily (CASSANDRA-12768)
  * Make sure sstables only get committed when it's safe to discard commit log 
records (CASSANDRA-12956)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java 
b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 33c80d5..22193c4 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.concurrent;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import io.netty.util.concurrent.FastThreadLocal;
+import io.netty.util.concurrent.FastThreadLocalThread;
+
 /**
  * This class is an implementation of the <i>ThreadFactory</i> interface. This
  * is useful to give Java threads meaningful names which is useful when using
@@ -54,12 +57,29 @@ public class NamedThreadFactory implements ThreadFactory
 
     public Thread newThread(Runnable runnable)
     {
-        String name = id + ":" + n.getAndIncrement();
-        Thread thread = new Thread(threadGroup, runnable, name);
+        String name = id + ':' + n.getAndIncrement();
+        Thread thread = new FastThreadLocalThread(threadGroup, 
threadLocalDeallocator(runnable), name);
         thread.setPriority(priority);
         thread.setDaemon(true);
         if (contextClassLoader != null)
             thread.setContextClassLoader(contextClassLoader);
         return thread;
     }
+
+    /**
+     * Ensures that {@link FastThreadLocal#remove() FastThreadLocal.remove()} 
is called when the {@link Runnable#run()}
+     * method of the given {@link Runnable} instance completes to ensure 
cleanup of {@link FastThreadLocal} instances.
+     * This is especially important for direct byte buffers allocated locally 
for a thread.
+     */
+    public static Runnable threadLocalDeallocator(Runnable r)
+    {
+        return () ->
+        {
+            try {
+                r.run();
+            } finally {
+                FastThreadLocal.removeAll();
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 113d1ba..e5a5887 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db.commitlog;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.slf4j.*;
@@ -159,7 +160,7 @@ public abstract class AbstractCommitLogService
             }
         };
 
-        thread = new Thread(runnable, name);
+        thread = new 
Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
         thread.start();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 82cee50..79dd316 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -174,7 +175,7 @@ public class CommitLogSegmentManager
 
         run = true;
 
-        managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
+        managerThread = new 
Thread(NamedThreadFactory.threadLocalDeallocator(runnable), 
"COMMIT-LOG-ALLOCATOR");
         managerThread.start();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index f573787..a9dfcdc 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -45,6 +45,7 @@ import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.xxhash.XXHashFactory;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
@@ -505,31 +506,27 @@ public class OutboundTcpConnection extends Thread
     {
         final AtomicInteger version = new AtomicInteger(NO_VERSION);
         final CountDownLatch versionLatch = new CountDownLatch(1);
-        new Thread("HANDSHAKE-" + poolReference.endPoint())
+        new Thread(NamedThreadFactory.threadLocalDeallocator(() ->
         {
-            @Override
-            public void run()
+            try
             {
-                try
-                {
-                    logger.info("Handshaking version with {}", 
poolReference.endPoint());
-                    version.set(inputStream.readInt());
-                }
-                catch (IOException ex)
-                {
-                    final String msg = "Cannot handshake version with " + 
poolReference.endPoint();
-                    if (logger.isTraceEnabled())
-                        logger.trace(msg, ex);
-                    else
-                        logger.info(msg);
-                }
-                finally
-                {
-                    //unblock the waiting thread on either success or fail
-                    versionLatch.countDown();
-                }
+                logger.info("Handshaking version with {}", 
poolReference.endPoint());
+                version.set(inputStream.readInt());
+            }
+            catch (IOException ex)
+            {
+                final String msg = "Cannot handshake version with " + 
poolReference.endPoint();
+                if (logger.isTraceEnabled())
+                    logger.trace(msg, ex);
+                else
+                    logger.info(msg);
+            }
+            finally
+            {
+                //unblock the waiting thread on either success or fail
+                versionLatch.countDown();
             }
-        }.start();
+        }),"HANDSHAKE-" + poolReference.endPoint()).start();
 
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 7dd1b31..213e5c5 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -373,7 +373,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
 
     private Thread createQueryThread(final int cmd, final UUID sessionId)
     {
-        return new Thread(new WrappedRunnable()
+        return new Thread(NamedThreadFactory.threadLocalDeallocator(new 
WrappedRunnable()
         {
             // Query events within a time interval that overlaps the last by 
one second. Ignore duplicates. Ignore local traces.
             // Wake up upon local trace activity. Query when notified of trace 
activity with a timeout that doubles every two timeouts.
@@ -440,6 +440,6 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
                     seen[si].clear();
                 }
             }
-        });
+        }));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java 
b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
index c98c0fe..61dfa50 100644
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.RequestSchedulerOptions;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -69,7 +70,7 @@ public class RoundRobinScheduler implements IRequestScheduler
                 }
             }
         };
-        Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
+        Thread scheduler = new 
Thread(NamedThreadFactory.threadLocalDeallocator(runnable), 
"REQUEST-SCHEDULER");
         scheduler.start();
         logger.info("Started the RoundRobin Request Scheduler");
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index d70c8dc..71cbc35 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.auth.AuthMigrationListener;
 import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
 import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
 import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -613,7 +614,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
 
         // daemon threads, like our executors', continue to run while shutdown 
hooks are invoked
-        drainOnShutdown = new Thread(new WrappedRunnable()
+        drainOnShutdown = new 
Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
         {
             @Override
             public void runMayThrow() throws InterruptedException, 
ExecutionException, IOException
@@ -628,7 +629,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 
logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
                 logbackHook.run();
             }
-        }, "StorageServiceShutdownHook");
+        }), "StorageServiceShutdownHook");
         Runtime.getRuntime().addShutdownHook(drainOnShutdown);
 
         replacing = DatabaseDescriptor.isReplacing();
@@ -3195,7 +3196,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             return 0;
 
         int cmd = nextRepairCommand.incrementAndGet();
-        new Thread(createRepairTask(cmd, keyspace, options, legacy)).start();
+        new 
Thread(NamedThreadFactory.threadLocalDeallocator(createRepairTask(cmd, 
keyspace, options, legacy))).start();
         return cmd;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java 
b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..b83c089 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -37,6 +37,8 @@ import com.google.common.util.concurrent.SettableFuture;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
@@ -214,7 +216,7 @@ public class ConnectionHandler
             this.socket = socket;
             this.protocolVersion = protocolVersion;
 
-            new Thread(this, name() + "-" + session.peer).start();
+            new Thread(NamedThreadFactory.threadLocalDeallocator(this), name() 
+ "-" + session.peer).start();
         }
 
         public ListenableFuture<?> close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java 
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index d59849f..6577980 100644
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -33,6 +33,7 @@ import com.google.common.primitives.Ints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -91,7 +92,7 @@ public class CompressedInputStream extends InputStream
         this.dataBuffer = new 
ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
         this.crcCheckChanceSupplier = crcCheckChanceSupplier;
 
-        new Thread(new Reader(source, info, dataBuffer)).start();
+        new Thread(NamedThreadFactory.threadLocalDeallocator(new 
Reader(source, info, dataBuffer))).start();
     }
 
     public int read() throws IOException

Reply via email to