Author: kwin
Date: Wed Apr 12 08:43:20 2017
New Revision: 1791091
URL: http://svn.apache.org/viewvc?rev=1791091&view=rev
Log:
SLING-6261 clean up thread locals for threads being reused in a thread pool
Added:
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalChangeListener.java
(with props)
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java
(with props)
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java
- copied, changed from r1790530,
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPool.java
sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocalsTest.java
- copied, changed from r1790530,
sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPoolTest.java
Removed:
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPool.java
sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPoolTest.java
Modified:
sling/trunk/bundles/commons/threads/pom.xml
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ModifiableThreadPoolConfig.java
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ThreadPoolConfig.java
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolMBeanImpl.java
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/jmx/ThreadPoolMBean.java
Modified: sling/trunk/bundles/commons/threads/pom.xml
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/pom.xml?rev=1791091&r1=1791090&r2=1791091&view=diff
==============================================================================
--- sling/trunk/bundles/commons/threads/pom.xml (original)
+++ sling/trunk/bundles/commons/threads/pom.xml Wed Apr 12 08:43:20 2017
@@ -88,5 +88,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ </dependency>
</dependencies>
</project>
Modified:
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ModifiableThreadPoolConfig.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ModifiableThreadPoolConfig.java?rev=1791091&r1=1791090&r2=1791091&view=diff
==============================================================================
---
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ModifiableThreadPoolConfig.java
(original)
+++
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ModifiableThreadPoolConfig.java
Wed Apr 12 08:43:20 2017
@@ -72,8 +72,12 @@ public final class ModifiableThreadPoolC
/** The queue size */
private int queueSize = -1;
- /** Max age of a thread in milliseconds */
- private long maxThreadAge = TimeUnit.MINUTES.toMillis(5);
+ /** Max age of a thread in milliseconds
+ * @deprecated Since version 3.4.0 always returns -1 as threads are no
longer retired
+ * but instead the thread locals are cleaned up (<a
href="https://issues.apache.org/jira/browse/SLING-6261">SLING-6261</a>)
+ */
+ @Deprecated
+ private long maxThreadAge = -1;
/** The keep alive time. */
private long keepAliveTime = 60000L;
@@ -169,9 +173,11 @@ public final class ModifiableThreadPoolC
}
- /**
+ /*
+ * (non-Javadoc)
* @see org.apache.sling.commons.threads.ThreadPoolConfig#getMaxThreadAge()
*/
+ @Override
public long getMaxThreadAge() {
return maxThreadAge;
}
@@ -179,6 +185,8 @@ public final class ModifiableThreadPoolC
/**
* Set the max thread age.
* @param maxThreadAge New max thread age in milliseconds.
+ * @deprecated Since version 3.4.0 should no longer be called, as threads
are no longer retired
+ * but instead the thread locals are cleaned up (<a
href="https://issues.apache.org/jira/browse/SLING-6261">SLING-6261</a>)
*/
public void setMaxThreadAge(final long maxThreadAge) {
this.maxThreadAge = maxThreadAge;
Modified:
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ThreadPoolConfig.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ThreadPoolConfig.java?rev=1791091&r1=1791090&r2=1791091&view=diff
==============================================================================
---
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ThreadPoolConfig.java
(original)
+++
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ThreadPoolConfig.java
Wed Apr 12 08:43:20 2017
@@ -61,7 +61,10 @@ public interface ThreadPoolConfig {
/**
* Return the maximum age before a thread is retired.
* @return The maximum age of a thread in milliseconds.
+ * @deprecated Since version 3.4.0 always returns -1 as threads are no
longer retired
+ * but instead the thread locals are cleaned up (<a
href="https://issues.apache.org/jira/browse/SLING-6261">SLING-6261</a>)
*/
+ @Deprecated
long getMaxThreadAge();
/**
Modified:
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java?rev=1791091&r1=1791090&r2=1791091&view=diff
==============================================================================
---
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java
(original)
+++
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java
Wed Apr 12 08:43:20 2017
@@ -132,19 +132,35 @@ public class DefaultThreadPool
handler = new ThreadPoolExecutor.CallerRunsPolicy();
break;
}
-
- this.executor = new
ThreadExpiringThreadPool(this.configuration.getMinPoolSize(),
- this.configuration.getMaxPoolSize(),
- this.configuration.getMaxThreadAge(),
- TimeUnit.MILLISECONDS,
- this.configuration.getKeepAliveTime(),
- TimeUnit.MILLISECONDS,
- queue,
- threadFactory,
- handler);
+ try {
+ this.executor = new
ThreadPoolExecutorCleaningThreadLocals(this.configuration.getMinPoolSize(),
+ this.configuration.getMaxPoolSize(),
+ this.configuration.getKeepAliveTime(),
+ TimeUnit.MILLISECONDS,
+ queue,
+ threadFactory,
+ handler,
+ new LoggingThreadLocalChangeListener());
+ } catch (IllegalStateException e) {
+ logger.warn("Unsupported JRE, cannot register
ThreadPoolExecutorCleaningThreadLocals due to '{}', fall back to regular
ThreadPoolExecutor", e.getMessage(), e);
+ this.executor = new
ThreadPoolExecutor(this.configuration.getMinPoolSize(),
+ this.configuration.getMaxPoolSize(),
+ this.configuration.getKeepAliveTime(),
+ TimeUnit.MILLISECONDS,
+ queue,
+ threadFactory,
+ handler);
+ }
this.logger.info("Thread pool [{}] initialized.", name);
}
+ private static class LoggingThreadLocalChangeListener implements
ThreadLocalChangeListener {
+ @Override
+ public void changed(Mode mode, Thread thread, ThreadLocal<?>
threadLocal, Object value) {
+ LOGGER.debug("Thread '{}' {} ThreadLocal {} with value {}",
thread, mode, threadLocal.getClass(), value);
+ }
+ }
+
/**
* @see org.apache.sling.commons.threads.ThreadPool#getName()
*/
Modified:
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java?rev=1791091&r1=1791090&r2=1791091&view=diff
==============================================================================
---
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java
(original)
+++
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java
Wed Apr 12 08:43:20 2017
@@ -98,9 +98,6 @@ public class DefaultThreadPoolManager
if ( props.get(ModifiableThreadPoolConfig.PROPERTY_QUEUE_SIZE) != null
) {
config.setQueueSize((Integer)props.get(ModifiableThreadPoolConfig.PROPERTY_QUEUE_SIZE));
}
- if ( props.get(ModifiableThreadPoolConfig.PROPERTY_MAX_THREAD_AGE) !=
null ) {
- config.setMaxThreadAge((Long)
props.get(ModifiableThreadPoolConfig.PROPERTY_MAX_THREAD_AGE));
- }
if ( props.get(ModifiableThreadPoolConfig.PROPERTY_KEEP_ALIVE_TIME) !=
null ) {
config.setKeepAliveTime((Long)props.get(ModifiableThreadPoolConfig.PROPERTY_KEEP_ALIVE_TIME));
}
Added:
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalChangeListener.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalChangeListener.java?rev=1791091&view=auto
==============================================================================
---
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalChangeListener.java
(added)
+++
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalChangeListener.java
Wed Apr 12 08:43:20 2017
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.commons.threads.impl;
+
+import java.lang.invoke.MethodHandles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Interface for listeners being attached to {@link ThreadLocalCleaner}.
+ */
+public interface ThreadLocalChangeListener {
+
+ Logger LOGGER =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ void changed(Mode mode, Thread thread, ThreadLocal<?> threadLocal, Object
value);
+
+ enum Mode {
+ ADDED, REMOVED
+ }
+}
Propchange:
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalChangeListener.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java?rev=1791091&view=auto
==============================================================================
---
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java
(added)
+++
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java
Wed Apr 12 08:43:20 2017
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.commons.threads.impl;
+
+import java.lang.ref.Reference;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+import org.apache.sling.commons.threads.impl.ThreadLocalChangeListener.Mode;
+
+/** Notifies a {@link ThreadLocalChangeListener} about changes on a thread
local storage. In addition it removes all references to variables
+ * being added to the thread local storage while the cleaner was running with
its {@link cleanup} method.
+ *
+ * @see <a
href="http://www.javaspecialists.eu/archive/Issue229.html">JavaSpecialist.eu -
Cleaning ThreadLocals</a> */
+public class ThreadLocalCleaner {
+ private final ThreadLocalChangeListener listener;
+
+ /* Reflection fields */
+ /** this field is in class {@link ThreadLocal} and is of type {@code
ThreadLocal.ThreadLocalMap} */
+ private static Field threadLocalsField;
+ /** this field is in class {@link ThreadLocal} and is of type {@code
ThreadLocal.ThreadLocalMap} */
+ private static Field inheritableThreadLocalsField;
+ private static Class<?> threadLocalMapClass;
+ /** this field is in class {@code ThreadLocal.ThreadLocalMap} and contains
an array of {@code ThreadLocal.ThreadLocalMap.Entry's} */
+ private static Field tableField;
+ private static Class<?> threadLocalMapEntryClass;
+ /** this field is in class {@code ThreadLocal.ThreadLocalMap.Entry} and
contains an object referencing the actual thread local
+ * variable */
+ private static Field threadLocalEntryValueField;
+ private static IllegalStateException reflectionException;
+
+ public ThreadLocalCleaner(ThreadLocalChangeListener listener) {
+ if (threadLocalsField == null) {
+ initReflectionFields();
+ }
+ this.listener = listener;
+ saveOldThreadLocals();
+ }
+
+ private static synchronized void initReflectionFields() throws
IllegalStateException {
+ // check if previous initialization lead to an exception
+ if (reflectionException != null) {
+ throw reflectionException;
+ }
+ // check if initialized
+ if (threadLocalsField == null) {
+ try {
+ threadLocalsField = field(Thread.class, "threadLocals");
+ inheritableThreadLocalsField = field(Thread.class,
"inheritableThreadLocals");
+ threadLocalMapClass = inner(ThreadLocal.class,
"ThreadLocalMap");
+ tableField = field(threadLocalMapClass, "table");
+ threadLocalMapEntryClass = inner(threadLocalMapClass, "Entry");
+ threadLocalEntryValueField = field(threadLocalMapEntryClass,
"value");
+ } catch (NoSuchFieldException e) {
+ reflectionException = new IllegalStateException(
+ "Could not locate threadLocals field in class Thread.
" +
+ "Will not be able to clear thread locals: " +
e);
+ throw reflectionException;
+ }
+ }
+ }
+
+ public void cleanup() {
+ // the first two diff calls are only to notify the listener, the
actual cleanup is done by restoreOldThreadLocals
+ diff(threadLocalsField, copyOfThreadLocals.get());
+ diff(inheritableThreadLocalsField,
copyOfInheritableThreadLocals.get());
+ restoreOldThreadLocals();
+ }
+
+ /** Notifies the {@link ThreadLocalChangeListener} about changes on thread
local variables for the current thread.
+ *
+ * @param field
+ * @param backup */
+ private void diff(Field field, Reference<?>[] backup) {
+ try {
+ Thread thread = Thread.currentThread();
+ Object threadLocals = field.get(thread);
+ if (threadLocals == null) {
+ if (backup != null) {
+ for (Reference<?> reference : backup) {
+ changed(thread, reference, Mode.REMOVED);
+ }
+ }
+ return;
+ }
+
+ Reference<?>[] current = (Reference<?>[])
tableField.get(threadLocals);
+ if (backup == null) {
+ for (Reference<?> reference : current) {
+ changed(thread, reference, Mode.ADDED);
+ }
+ } else {
+ // nested loop - both arrays *should* be relatively small
+ next: for (Reference<?> curRef : current) {
+ if (curRef != null) {
+ if (curRef.get() == copyOfThreadLocals ||
+ curRef.get() == copyOfInheritableThreadLocals)
{
+ continue next;
+ }
+ for (Reference<?> backupRef : backup) {
+ if (curRef == backupRef)
+ continue next;
+ }
+ // could not find it in backup - added
+ changed(thread, curRef, Mode.ADDED);
+ }
+ }
+ next: for (Reference<?> backupRef : backup) {
+ for (Reference<?> curRef : current) {
+ if (curRef == backupRef)
+ continue next;
+ }
+ // could not find it in current - removed
+ changed(thread, backupRef, Mode.REMOVED);
+ }
+ }
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Access denied", e);
+ }
+ }
+
+ private void changed(Thread thread, Reference<?> reference,
+ ThreadLocalChangeListener.Mode mode)
+ throws IllegalAccessException {
+ listener.changed(mode,
+ thread, (ThreadLocal<?>) reference.get(),
+ threadLocalEntryValueField.get(reference));
+ }
+
+ /** @param c the class containing the field
+ * @param name the name of the field
+ * @return the field from the given class with the given name (made
accessible)
+ * @throws NoSuchFieldException */
+ private static Field field(Class<?> c, String name)
+ throws NoSuchFieldException {
+ Field field = c.getDeclaredField(name);
+ field.setAccessible(true);
+ return field;
+ }
+
+ /** @param clazz the class containing the inner class
+ * @param name the name of the inner class
+ * @return the class with the given name, declared as inner class of the
given class */
+ private static Class<?> inner(Class<?> clazz, String name) {
+ for (Class<?> c : clazz.getDeclaredClasses()) {
+ if (c.getSimpleName().equals(name)) {
+ return c;
+ }
+ }
+ throw new IllegalStateException(
+ "Could not find inner class " + name + " in " + clazz);
+ }
+
+ private static final ThreadLocal<Reference<?>[]> copyOfThreadLocals = new
ThreadLocal<>();
+
+ private static final ThreadLocal<Reference<?>[]>
copyOfInheritableThreadLocals = new ThreadLocal<>();
+
+ private static void saveOldThreadLocals() {
+ copyOfThreadLocals.set(copy(threadLocalsField));
+ copyOfInheritableThreadLocals.set(copy(inheritableThreadLocalsField));
+ }
+
+ private static Reference<?>[] copy(Field field) {
+ try {
+ Thread thread = Thread.currentThread();
+ Object threadLocals = field.get(thread);
+ if (threadLocals == null)
+ return null;
+ Reference<?>[] table = (Reference<?>[])
tableField.get(threadLocals);
+ return Arrays.copyOf(table, table.length);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Access denied", e);
+ }
+ }
+
+ private static void restoreOldThreadLocals() {
+ try {
+ restore(inheritableThreadLocalsField,
copyOfInheritableThreadLocals.get());
+ restore(threadLocalsField, copyOfThreadLocals.get());
+ } finally {
+ copyOfThreadLocals.remove();
+ copyOfInheritableThreadLocals.remove();
+ }
+ }
+
+ private static void restore(Field field, Object value) {
+ try {
+ Thread thread = Thread.currentThread();
+ if (value == null) {
+ field.set(thread, null);
+ } else {
+ tableField.set(field.get(thread), value);
+ }
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Access denied", e);
+ }
+ }
+
+ static {
+ // TODO: move to a place where the exception can be caught!
+
+ }
+}
\ No newline at end of file
Propchange:
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied:
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java
(from r1790530,
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPool.java)
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java?p2=sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java&p1=sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPool.java&r1=1790530&r2=1791091&rev=1791091&view=diff
==============================================================================
---
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPool.java
(original)
+++
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java
Wed Apr 12 08:43:20 2017
@@ -16,131 +16,50 @@
*/
package org.apache.sling.commons.threads.impl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.lang.invoke.MethodHandles;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-/**
- * An extension of ThreadPoolExecutor, which keeps track of the age
- * of the worker threads and expires them when they get older than
- * a specified max-age.
- * <br/>
- * To be precise, a thread is expired when it finishes processing
- * a task and its max-age has been exceeded at that time. I.e. if a
- * thread is idle past its expiry, it may still process a single
- * task before it is expired.
- */
-public class ThreadExpiringThreadPool extends ThreadPoolExecutor {
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- private static final Logger LOG =
LoggerFactory.getLogger(ThreadExpiringThreadPool.class);
+/**
+ * An extension of ThreadPoolExecutor which automatically gets rid of all
{@link ThreadLocal} references before a thread is going to be reused.
+ * @see ThreadLocalCleaner
+ */
+public class ThreadPoolExecutorCleaningThreadLocals extends ThreadPoolExecutor
{
+ private final ThreadLocalChangeListener listener;
- /**
- * Map from thread-id to the time (in milliseconds) when a thread was
first used to
- * process a task. This is used to look determine when a thread is to be
expired.
- */
- private final ConcurrentHashMap<Long, Long> threadStartTimes;
-
- /**
- * Thread max-age in milliseconds.
- */
- private final long maxThreadAge;
-
- /**
- * Convenience flag indicating whether threads expire or not.
- * This is equivalent to {@code maxThreadAge >= 0}.
- */
- private final boolean enableThreadExpiry;
-
- /**
- * Marker exception object thrown to terminate threads that have
- * reached or exceeded their max-age. This exception is intentionally
- * used for (minimal) control flow, i.e. the {@code ThreadPoolExecutor}
- * will dispose of any thread that threw an exception and create a new
- * one in its stead. This exception should never show up in any logs,
- * otherwise it is a bug.
- */
- private final RuntimeException expiredThreadException;
-
- public ThreadExpiringThreadPool(
- final int corePoolSize,
- final int maximumPoolSize,
- final long maxThreadAge,
- final TimeUnit maxThreadAgeUnit,
- final long keepAliveTime,
- final TimeUnit keepAliveTimeUnit,
- final BlockingQueue<Runnable> workQueue,
- final ThreadFactory threadFactory,
- final RejectedExecutionHandler handler
- ) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, keepAliveTimeUnit,
workQueue, threadFactory, handler);
- this.threadStartTimes = new ConcurrentHashMap<Long,
Long>(maximumPoolSize);
- this.maxThreadAge = TimeUnit.MILLISECONDS.convert(maxThreadAge,
maxThreadAgeUnit);
- this.enableThreadExpiry = maxThreadAge >= 0;
- this.expiredThreadException = new RuntimeException("Kill old thread");
- }
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- @Override
- protected void beforeExecute(final Thread thread, final Runnable runnable)
{
- if (enableThreadExpiry) {
- recordStartTime(thread);
- }
- super.beforeExecute(thread, runnable);
+ public ThreadPoolExecutorCleaningThreadLocals(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler,
+ ThreadLocalChangeListener listener) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
+ workQueue, threadFactory, handler);
+ this.listener = listener;
}
- private void recordStartTime(final Thread thread) {
- final long threadId = thread.getId();
- if (threadStartTimes.putIfAbsent(threadId, System.currentTimeMillis())
== null) {
- LOG.debug("{} used for the first time.", thread);
-
- // The uncaught exception handler makes sure that the exception
- // signalling the death of a thread is swallowed. All other
- // Throwables are handed to the originalHandler.
- final Thread.UncaughtExceptionHandler originalHandler =
thread.getUncaughtExceptionHandler();
- thread.setUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(final Thread thread, final
Throwable throwable) {
- // first reset the original uncaught exception handler -
just as a precaution
- thread.setUncaughtExceptionHandler(originalHandler);
-
- // ignore expected exception thrown to terminate the thread
- if (throwable == expiredThreadException) {
- return;
- }
-
- // delegate any other exceptions to the original uncaught
exception handler
- if (originalHandler != null) {
- originalHandler.uncaughtException(thread, throwable);
- }
- }
- });
- }
- }
+ private static final ThreadLocal<ThreadLocalCleaner> local = new
ThreadLocal<>();
- @Override
- protected void afterExecute(final Runnable runnable, final Throwable
throwable) {
- super.afterExecute(runnable, throwable);
- if (throwable == null && enableThreadExpiry) {
- checkMaxThreadAge(Thread.currentThread());
- }
+ protected void beforeExecute(Thread t, Runnable r) {
+ LOGGER.debug("Collecting changes to ThreadLocal for thread {} from now
on...", t);
+ ThreadLocalCleaner cleaner = new ThreadLocalCleaner(listener);
+ local.set(cleaner);
}
- private void checkMaxThreadAge(final Thread thread) {
- final long now = System.currentTimeMillis();
- final long threadId = thread.getId();
- final Long started = threadStartTimes.get(threadId);
- if (started != null && now >= started + maxThreadAge) {
- final long delta = now - (started + maxThreadAge);
- LOG.debug("{} exceeded its max age by {}ms and will be replaced.",
thread, delta);
- threadStartTimes.remove(threadId);
-
- // throw marker exception to kill this thread and thus trigger
creation of a new one
- throw expiredThreadException;
- }
+ protected void afterExecute(Runnable r, Throwable t) {
+ LOGGER.debug("Cleaning up thread locals for thread {}...",
Thread.currentThread());
+ ThreadLocalCleaner cleaner = local.get();
+ local.remove();
+ cleaner.cleanup();
}
}
Modified:
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolMBeanImpl.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolMBeanImpl.java?rev=1791091&r1=1791090&r2=1791091&view=diff
==============================================================================
---
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolMBeanImpl.java
(original)
+++
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolMBeanImpl.java
Wed Apr 12 08:43:20 2017
@@ -100,6 +100,7 @@ class ThreadPoolMBeanImpl extends Standa
}
}
+ @Deprecated
public long getMaxThreadAge() {
return this.entry.getConfig().getMaxThreadAge();
}
Modified:
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/jmx/ThreadPoolMBean.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/jmx/ThreadPoolMBean.java?rev=1791091&r1=1791090&r2=1791091&view=diff
==============================================================================
---
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/jmx/ThreadPoolMBean.java
(original)
+++
sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/jmx/ThreadPoolMBean.java
Wed Apr 12 08:43:20 2017
@@ -86,7 +86,10 @@ public interface ThreadPoolMBean {
* Return the configured max thread age.
*
* @return The configured max thread age.
+ * @deprecated Since version 1.1.1 always returns -1 as threads are no
longer retired
+ * but instead the thread locals are cleaned up (<a
href="https://issues.apache.org/jira/browse/SLING-6261">SLING-6261</a>)
*/
+ @Deprecated
long getMaxThreadAge();
/**
Copied:
sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocalsTest.java
(from r1790530,
sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPoolTest.java)
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocalsTest.java?p2=sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocalsTest.java&p1=sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPoolTest.java&r1=1790530&r2=1791091&rev=1791091&view=diff
==============================================================================
---
sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPoolTest.java
(original)
+++
sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocalsTest.java
Wed Apr 12 08:43:20 2017
@@ -16,402 +16,77 @@
*/
package org.apache.sling.commons.threads.impl;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static java.util.Arrays.asList;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class ThreadExpiringThreadPoolTest {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ThreadExpiringThreadPoolTest.class);
-
- private static final int MAX_THREAD_AGE_MS = 90; // let threads expire
after this many ms
-
- @Rule
- public ThreadPoolContext context = new ThreadPoolContext();
-
-
- /**
- * Attempts to isolate failures that happen > 0.2% of the time related to
the
- * way in which the underlying thread pool behaves. This is not normally
run as
- * a test , but use it if you want to isolate a rare failure.
- */
- //@Test
- public void shouldLetMultipleThreadsDieAfterExpiryMulti() {
- int fail = 0;
- int success = 0;
- for (int i = 0; i < 500; i++) {
- try {
- LOG.info("Running {} ", i);
- context = new ThreadPoolContext();
- context.before();
- shouldLetMultipleThreadsDieAfterExpiry();
- success++;
- } catch (Throwable e) {
- LOG.error("Failed ", e);
- fail++;
- fail("Race condition encountered");
- } finally {
- context.after();
- }
- }
- LOG.info("Failed {} sucess {}", fail, success);
- assertEquals(0, fail);
- }
- /**
- * Attempts to isolate failures that happen > 0.2% of the time related to
the
- * way in which the underlying thread pool behaves. This is not normally
run as
- * a test, but use it if you want to isolate a rare failure.
- */
- // @Test
- public void shouldCreateNewThreadAfterExpiryMulti() {
-
- int fail = 0;
- int success = 0;
- for (int i = 0; i < 500; i++) {
- try {
- LOG.info("Running {} ", i);
- context = new ThreadPoolContext();
- context.before();
- shouldCreateNewThreadAfterExpiry();
- success++;
- } catch (Throwable e ) {
- LOG.error("Failed ",e);
- fail++;
- fail("Race condition encountered");
- } finally {
- context.after();
- }
- }
- LOG.info("Failed {} sucess {}", fail, success);
- assertEquals(0, fail);
- }
- /**
- * Attempts to isolate failures that happen > 0.2% of the time related to
the
- * way in which the underlying thread pool behaves. This is not normally
run as
- * a test, but use it if you want to isolate a rare failure.
- */
- // @Test
- public void shouldCreateNewThreadAfterExpiryForFailingTasksMulti() {
-
- int fail = 0;
- int success = 0;
- for (int i = 0; i < 500; i++) {
- try {
- LOG.info("Running {} ", i);
- context = new ThreadPoolContext();
- context.before();
- shouldCreateNewThreadAfterExpiryForFailingTasks();
- success++;
- } catch (Throwable e ) {
- LOG.error("Failed ",e);
- fail++;
- fail("Race condition encountered");
- } finally {
- context.after();
- }
- }
- LOG.info("Failed {} sucess {}", fail, success);
- assertEquals(0, fail);
-
- }
-
-
- @Test
- public void shouldCreateNewThreadAfterExpiry() throws
InterruptedException, ExecutionException {
- final TrackingThreadFactory threadFactory = context.getThreadFactory();
- final ThreadExpiringThreadPool pool = context.getPool();
-
- assertThat(threadFactory.getThreadCount(), is(0));
-
- assertExecutionByThread(pool, "test-thread-0");
- assertExecutionByThread(pool, "test-thread-0");
- assertExecutionByThread(pool, "test-thread-0");
- assertThat(threadFactory.getThreadCount(), is(1));
-
- letThreadsDie();
-
- // thread executes one more task after expiring
- assertExecutionByThread(pool, "test-thread-0");
- assertExecutionByThread(pool, "test-thread-1");
- assertThat(threadFactory.getThreadCount(), is(2));
- assertActiveThreads(threadFactory, "test-thread-1");
- assertExpiredThreads(threadFactory, "test-thread-0");
- }
-
- @Test
- public void shouldCreateNewThreadAfterExpiryForFailingTasks() throws
InterruptedException, ExecutionException {
- final TrackingThreadFactory threadFactory = context.getThreadFactory();
- final ThreadExpiringThreadPool pool = context.getPool();
-
- assertThat(threadFactory.getThreadCount(), is(0));
-
- assertFailingSubmitThreadName(pool, "test-thread-0");
- assertFailingSubmitThreadName(pool, "test-thread-0");
- assertFailingSubmitThreadName(pool, "test-thread-0");
- assertThat(threadFactory.getThreadCount(), is(1));
-
- letThreadsDie();
-
- // thread executes one more task after expiring
- assertFailingSubmitThreadName(pool, "test-thread-0");
- assertFailingSubmitThreadName(pool, "test-thread-1");
- assertThat(threadFactory.getThreadCount(), is(2));
-
- assertActiveThreads(threadFactory, "test-thread-1");
- assertExpiredThreads(threadFactory, "test-thread-0");
+import org.apache.sling.commons.threads.impl.ThreadLocalChangeListener.Mode;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ThreadPoolExecutorCleaningThreadLocalsTest {
+
+ public ThreadPoolExecutorCleaningThreadLocals pool;
+
+ @Mock
+ public ThreadLocalChangeListener listener;
+
+
+ @Before
+ public void setUp() {
+ final BlockingQueue<Runnable> queue = new
ArrayBlockingQueue<Runnable>(20);
+ final RejectedExecutionHandler rejectionHandler = new
ThreadPoolExecutor.AbortPolicy();
+ pool = new ThreadPoolExecutorCleaningThreadLocals(
+ 1, 1, 100, TimeUnit.MILLISECONDS,
+ queue, Executors.defaultThreadFactory(), rejectionHandler,
listener);
}
@Test
- public void shouldLetMultipleThreadsDieAfterExpiry()
- throws ExecutionException, InterruptedException {
-
- final TrackingThreadFactory threadFactory = context.getThreadFactory();
- final ThreadExpiringThreadPool pool = context.getPool();
- pool.setCorePoolSize(3);
- pool.setMaximumPoolSize(3);
-
- assertParallelExecutionsByThread(pool, "test-thread-0",
"test-thread-1", "test-thread-2");
- assertThat(threadFactory.getThreadCount(), is(3));
-
- letThreadsDie();
- // thread executes one more task after expiring
- executeParallelTasks(pool, 3);
-
- assertParallelExecutionsByThread(pool, "test-thread-3",
"test-thread-4", "test-thread-5");
- assertThat(threadFactory.getThreadCount(), is(6));
-
- assertActiveThreads(threadFactory, "test-thread-3", "test-thread-4",
"test-thread-5");
- assertExpiredThreads(threadFactory, "test-thread-0", "test-thread-1",
"test-thread-2");
- }
-
- private void assertActiveThreads(final TrackingThreadFactory factory,
final String... names) {
- assertThat("Active threads", factory.getActiveThreads(),
equalTo(asSet(names)));
- }
-
- private void assertExpiredThreads(final TrackingThreadFactory factory,
final String... names) {
- assertThat("Expired threads", factory.getExpiredThreads(),
equalTo(asSet(names)));
- }
-
- private Set<String> asSet(final String... items) {
- return new HashSet<String>(asList(items));
- }
-
- private void assertParallelExecutionsByThread(final ExecutorService pool,
final String... expectedThreads)
- throws InterruptedException {
-
- final Task[] tasks = executeParallelTasks(pool, 3);
- final List<String> threadNames = new ArrayList<String>();
- for (final Task task : tasks) {
- threadNames.add(task.executedBy);
- }
- for (final String expectedThread : expectedThreads) {
- assertTrue("No task was executed by " + expectedThread,
- threadNames.remove(expectedThread));
- assertFalse("Multiple tasks were executed by " + expectedThread,
- threadNames.contains(expectedThread));
- }
- }
-
- private Task[] executeParallelTasks(final ExecutorService pool, final int
number)
- throws InterruptedException {
- final Task[] tasks = new Task[number];
- final CountDownLatch latch = new CountDownLatch(number);
- for (int i = 0; i < tasks.length; i++) {
- tasks[i] = new Task(latch);
- pool.execute(tasks[i]);
- }
- pool.awaitTermination(MAX_THREAD_AGE_MS, TimeUnit.MILLISECONDS);
- return tasks;
+ public void testThreadLocalBeingCleanedUp() throws InterruptedException,
ExecutionException {
+ assertTaskDoesNotSeeOldThreadLocals("test");
+ assertTaskDoesNotSeeOldThreadLocals("test2");
+ // verify mock interactions (at least the additions from the first
task should be visible to the listener now)
+ Mockito.verify(listener).changed(Matchers.eq(Mode.ADDED),
Matchers.any(Thread.class), Matchers.eq(ThreadLocalTask.threadLocalVariable),
Matchers.eq("test"));
+ // no thread locals should have been removed
+ Mockito.verify(listener,
Mockito.times(0)).changed(Matchers.eq(Mode.REMOVED),
Matchers.any(Thread.class), Matchers.eq(ThreadLocalTask.threadLocalVariable),
Matchers.anyString());
}
- private void assertExecutionByThread(final ExecutorService pool, final
String expectedThread)
- throws ExecutionException, InterruptedException {
- final Task task = new Task();
+ private void assertTaskDoesNotSeeOldThreadLocals(String value) throws
InterruptedException, ExecutionException {
+ ThreadLocalTask task = new ThreadLocalTask(value);
pool.submit(task).get();
- assertEquals("Thread name", expectedThread, task.executedBy);
+ Assert.assertNull(task.getOldValue());
}
- private void assertFailingSubmitThreadName(final ExecutorService pool,
final String expectedThread)
- throws ExecutionException, InterruptedException {
- final Task task = new ExceptionTask();
- try {
- pool.submit(task).get();
- } catch (ExecutionException e) {
- if (!e.getCause().getMessage().startsWith("ExceptionTask #")) {
- LOG.error("Unexpected exception: ", e);
- fail("Unexpected exception: " + e.getMessage());
- }
- }
- assertEquals("Thread name", expectedThread, task.executedBy);
- }
+ private static class ThreadLocalTask implements Runnable {
+ static final ThreadLocal<String> threadLocalVariable = new
ThreadLocal<String>();
- private void letThreadsDie() throws InterruptedException {
- TimeUnit.MILLISECONDS.sleep(MAX_THREAD_AGE_MS * 2);
- }
+ private final String newValue;
+ private volatile String oldValue;
- private static class Task implements Runnable {
-
- private static int counter = 0;
-
- protected final int count;
-
- private final CountDownLatch mayFinish;
-
- protected String executedBy;
-
- Task() {
- this(new CountDownLatch(0));
+ public ThreadLocalTask(String newValue) {
+ this.newValue = newValue;
}
- Task(final CountDownLatch latch) {
- this.mayFinish = latch;
- this.count = counter++;
- }
-
- @Override
- public void run() {
- mayFinish.countDown();
- final Thread thread = Thread.currentThread();
- try {
- mayFinish.await();
- } catch (InterruptedException e) {
- thread.interrupt();
- }
- LOG.info("{} #{} running in thread {}",
- new Object[] {getClass().getSimpleName(), count, thread});
- executedBy = thread.getName();
- }
- }
-
- private static class ExceptionTask extends Task {
@Override
public void run() {
- super.run();
- throw new RuntimeException("ExceptionTask #" + count);
+ oldValue = threadLocalVariable.get();
+ // set thread local to a new value
+ threadLocalVariable.set(newValue);
}
- }
-
- private static class TrackingThreadFactory implements ThreadFactory {
-
- private final ThreadGroup group;
- private final AtomicInteger threadCount = new AtomicInteger(0);
-
- private final List<Thread> threadHistory = new
CopyOnWriteArrayList<Thread>();
-
- public TrackingThreadFactory() {
- group = Thread.currentThread().getThreadGroup();
- }
-
- public int getThreadCount() {
- return threadHistory.size();
- }
-
- public Set<String> getActiveThreads() {
- letThreadsDie();
- final HashSet<String> active = new HashSet<String>();
- for (final Thread thread : threadHistory) {
- if (thread.isAlive()) {
- active.add(thread.getName());
- }
- }
- return active;
- }
-
- public Set<String> getExpiredThreads() {
- letThreadsDie();
- final HashSet<String> expired = new HashSet<String>();
- for (final Thread thread : threadHistory) {
- if (!thread.isAlive()) {
- expired.add(thread.getName());
- }
- }
- return expired;
- }
-
- /**
- * This avoids a race condition where a thread has been evicted from
the pool but is still alive becuase it evicted itself.
- * JDK8 java.util.concurrent.ThreadPoolExecutor does this. The 15ms
assumes the process takes no more than 15ms to complete.
- * That is OS and VM dependent.
- */
- public void letThreadsDie() {
- try {
- Thread.sleep(15);
- } catch ( Exception e) {
- LOG.debug(e.getMessage(),e);
- }
- }
-
- @Override
- public Thread newThread(final Runnable r) {
- final Thread thread = new Thread(group, r, "test-thread-" +
threadCount.getAndIncrement());
- thread.setDaemon(false);
- thread.setPriority(Thread.NORM_PRIORITY);
- threadHistory.add(thread);
- LOG.info("Created thread {}", thread.getName());
- return thread;
- }
- }
-
- public static class ThreadPoolContext extends ExternalResource {
-
- public TrackingThreadFactory getThreadFactory() {
- return threadFactory;
- }
-
- public ThreadExpiringThreadPool getPool() {
- return pool;
- }
-
- private TrackingThreadFactory threadFactory;
-
- private ThreadExpiringThreadPool pool;
-
- @Override
- protected void before() throws Throwable {
- Task.counter = 0; // reset counter
- final BlockingQueue<Runnable> queue = new
ArrayBlockingQueue<Runnable>(20);
- final RejectedExecutionHandler rejectionHandler = new
ThreadPoolExecutor.AbortPolicy();
- threadFactory = new TrackingThreadFactory();
- pool = new ThreadExpiringThreadPool(
- 1, 1,
- MAX_THREAD_AGE_MS, TimeUnit.MILLISECONDS,
- 1000, TimeUnit.MILLISECONDS,
- queue, threadFactory, rejectionHandler);
- }
-
- @Override
- protected void after() {
- threadFactory = null;
- pool = null;
+ public String getOldValue() {
+ return oldValue;
}
}
}