Author: kwin Date: Wed Apr 12 08:43:20 2017 New Revision: 1791091 URL: http://svn.apache.org/viewvc?rev=1791091&view=rev Log: SLING-6261 clean up thread locals for threads being reused in a thread pool
Added: sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalChangeListener.java (with props) sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java (with props) sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java - copied, changed from r1790530, sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPool.java sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocalsTest.java - copied, changed from r1790530, sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPoolTest.java Removed: sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPool.java sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPoolTest.java Modified: sling/trunk/bundles/commons/threads/pom.xml sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ModifiableThreadPoolConfig.java sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ThreadPoolConfig.java sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolMBeanImpl.java sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/jmx/ThreadPoolMBean.java Modified: sling/trunk/bundles/commons/threads/pom.xml URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/pom.xml?rev=1791091&r1=1791090&r2=1791091&view=diff ============================================================================== --- sling/trunk/bundles/commons/threads/pom.xml (original) +++ sling/trunk/bundles/commons/threads/pom.xml Wed Apr 12 08:43:20 2017 @@ -88,5 +88,10 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.9.5</version> + </dependency> </dependencies> </project> Modified: sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ModifiableThreadPoolConfig.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ModifiableThreadPoolConfig.java?rev=1791091&r1=1791090&r2=1791091&view=diff ============================================================================== --- sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ModifiableThreadPoolConfig.java (original) +++ sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ModifiableThreadPoolConfig.java Wed Apr 12 08:43:20 2017 @@ -72,8 +72,12 @@ public final class ModifiableThreadPoolC /** The queue size */ private int queueSize = -1; - /** Max age of a thread in milliseconds */ - private long maxThreadAge = TimeUnit.MINUTES.toMillis(5); + /** Max age of a thread in milliseconds + * @deprecated Since version 3.4.0 always returns -1 as threads are no longer retired + * but instead the thread locals are cleaned up (<a href="https://issues.apache.org/jira/browse/SLING-6261">SLING-6261</a>) + */ + @Deprecated + private long maxThreadAge = -1; /** The keep alive time. */ private long keepAliveTime = 60000L; @@ -169,9 +173,11 @@ public final class ModifiableThreadPoolC } - /** + /* + * (non-Javadoc) * @see org.apache.sling.commons.threads.ThreadPoolConfig#getMaxThreadAge() */ + @Override public long getMaxThreadAge() { return maxThreadAge; } @@ -179,6 +185,8 @@ public final class ModifiableThreadPoolC /** * Set the max thread age. * @param maxThreadAge New max thread age in milliseconds. + * @deprecated Since version 3.4.0 should no longer be called, as threads are no longer retired + * but instead the thread locals are cleaned up (<a href="https://issues.apache.org/jira/browse/SLING-6261">SLING-6261</a>) */ public void setMaxThreadAge(final long maxThreadAge) { this.maxThreadAge = maxThreadAge; Modified: sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ThreadPoolConfig.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ThreadPoolConfig.java?rev=1791091&r1=1791090&r2=1791091&view=diff ============================================================================== --- sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ThreadPoolConfig.java (original) +++ sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/ThreadPoolConfig.java Wed Apr 12 08:43:20 2017 @@ -61,7 +61,10 @@ public interface ThreadPoolConfig { /** * Return the maximum age before a thread is retired. * @return The maximum age of a thread in milliseconds. + * @deprecated Since version 3.4.0 always returns -1 as threads are no longer retired + * but instead the thread locals are cleaned up (<a href="https://issues.apache.org/jira/browse/SLING-6261">SLING-6261</a>) */ + @Deprecated long getMaxThreadAge(); /** Modified: sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java?rev=1791091&r1=1791090&r2=1791091&view=diff ============================================================================== --- sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java (original) +++ sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java Wed Apr 12 08:43:20 2017 @@ -132,19 +132,35 @@ public class DefaultThreadPool handler = new ThreadPoolExecutor.CallerRunsPolicy(); break; } - - this.executor = new ThreadExpiringThreadPool(this.configuration.getMinPoolSize(), - this.configuration.getMaxPoolSize(), - this.configuration.getMaxThreadAge(), - TimeUnit.MILLISECONDS, - this.configuration.getKeepAliveTime(), - TimeUnit.MILLISECONDS, - queue, - threadFactory, - handler); + try { + this.executor = new ThreadPoolExecutorCleaningThreadLocals(this.configuration.getMinPoolSize(), + this.configuration.getMaxPoolSize(), + this.configuration.getKeepAliveTime(), + TimeUnit.MILLISECONDS, + queue, + threadFactory, + handler, + new LoggingThreadLocalChangeListener()); + } catch (IllegalStateException e) { + logger.warn("Unsupported JRE, cannot register ThreadPoolExecutorCleaningThreadLocals due to '{}', fall back to regular ThreadPoolExecutor", e.getMessage(), e); + this.executor = new ThreadPoolExecutor(this.configuration.getMinPoolSize(), + this.configuration.getMaxPoolSize(), + this.configuration.getKeepAliveTime(), + TimeUnit.MILLISECONDS, + queue, + threadFactory, + handler); + } this.logger.info("Thread pool [{}] initialized.", name); } + private static class LoggingThreadLocalChangeListener implements ThreadLocalChangeListener { + @Override + public void changed(Mode mode, Thread thread, ThreadLocal<?> threadLocal, Object value) { + LOGGER.debug("Thread '{}' {} ThreadLocal {} with value {}", thread, mode, threadLocal.getClass(), value); + } + } + /** * @see org.apache.sling.commons.threads.ThreadPool#getName() */ Modified: sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java?rev=1791091&r1=1791090&r2=1791091&view=diff ============================================================================== --- sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java (original) +++ sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPoolManager.java Wed Apr 12 08:43:20 2017 @@ -98,9 +98,6 @@ public class DefaultThreadPoolManager if ( props.get(ModifiableThreadPoolConfig.PROPERTY_QUEUE_SIZE) != null ) { config.setQueueSize((Integer)props.get(ModifiableThreadPoolConfig.PROPERTY_QUEUE_SIZE)); } - if ( props.get(ModifiableThreadPoolConfig.PROPERTY_MAX_THREAD_AGE) != null ) { - config.setMaxThreadAge((Long) props.get(ModifiableThreadPoolConfig.PROPERTY_MAX_THREAD_AGE)); - } if ( props.get(ModifiableThreadPoolConfig.PROPERTY_KEEP_ALIVE_TIME) != null ) { config.setKeepAliveTime((Long)props.get(ModifiableThreadPoolConfig.PROPERTY_KEEP_ALIVE_TIME)); } Added: sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalChangeListener.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalChangeListener.java?rev=1791091&view=auto ============================================================================== --- sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalChangeListener.java (added) +++ sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalChangeListener.java Wed Apr 12 08:43:20 2017 @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sling.commons.threads.impl; + +import java.lang.invoke.MethodHandles; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Interface for listeners being attached to {@link ThreadLocalCleaner}. + */ +public interface ThreadLocalChangeListener { + + Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + void changed(Mode mode, Thread thread, ThreadLocal<?> threadLocal, Object value); + + enum Mode { + ADDED, REMOVED + } +} Propchange: sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalChangeListener.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java?rev=1791091&view=auto ============================================================================== --- sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java (added) +++ sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java Wed Apr 12 08:43:20 2017 @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sling.commons.threads.impl; + +import java.lang.ref.Reference; +import java.lang.reflect.Field; +import java.util.Arrays; + +import org.apache.sling.commons.threads.impl.ThreadLocalChangeListener.Mode; + +/** Notifies a {@link ThreadLocalChangeListener} about changes on a thread local storage. In addition it removes all references to variables + * being added to the thread local storage while the cleaner was running with its {@link cleanup} method. + * + * @see <a href="http://www.javaspecialists.eu/archive/Issue229.html">JavaSpecialist.eu - Cleaning ThreadLocals</a> */ +public class ThreadLocalCleaner { + private final ThreadLocalChangeListener listener; + + /* Reflection fields */ + /** this field is in class {@link ThreadLocal} and is of type {@code ThreadLocal.ThreadLocalMap} */ + private static Field threadLocalsField; + /** this field is in class {@link ThreadLocal} and is of type {@code ThreadLocal.ThreadLocalMap} */ + private static Field inheritableThreadLocalsField; + private static Class<?> threadLocalMapClass; + /** this field is in class {@code ThreadLocal.ThreadLocalMap} and contains an array of {@code ThreadLocal.ThreadLocalMap.Entry's} */ + private static Field tableField; + private static Class<?> threadLocalMapEntryClass; + /** this field is in class {@code ThreadLocal.ThreadLocalMap.Entry} and contains an object referencing the actual thread local + * variable */ + private static Field threadLocalEntryValueField; + private static IllegalStateException reflectionException; + + public ThreadLocalCleaner(ThreadLocalChangeListener listener) { + if (threadLocalsField == null) { + initReflectionFields(); + } + this.listener = listener; + saveOldThreadLocals(); + } + + private static synchronized void initReflectionFields() throws IllegalStateException { + // check if previous initialization lead to an exception + if (reflectionException != null) { + throw reflectionException; + } + // check if initialized + if (threadLocalsField == null) { + try { + threadLocalsField = field(Thread.class, "threadLocals"); + inheritableThreadLocalsField = field(Thread.class, "inheritableThreadLocals"); + threadLocalMapClass = inner(ThreadLocal.class, "ThreadLocalMap"); + tableField = field(threadLocalMapClass, "table"); + threadLocalMapEntryClass = inner(threadLocalMapClass, "Entry"); + threadLocalEntryValueField = field(threadLocalMapEntryClass, "value"); + } catch (NoSuchFieldException e) { + reflectionException = new IllegalStateException( + "Could not locate threadLocals field in class Thread. " + + "Will not be able to clear thread locals: " + e); + throw reflectionException; + } + } + } + + public void cleanup() { + // the first two diff calls are only to notify the listener, the actual cleanup is done by restoreOldThreadLocals + diff(threadLocalsField, copyOfThreadLocals.get()); + diff(inheritableThreadLocalsField, copyOfInheritableThreadLocals.get()); + restoreOldThreadLocals(); + } + + /** Notifies the {@link ThreadLocalChangeListener} about changes on thread local variables for the current thread. + * + * @param field + * @param backup */ + private void diff(Field field, Reference<?>[] backup) { + try { + Thread thread = Thread.currentThread(); + Object threadLocals = field.get(thread); + if (threadLocals == null) { + if (backup != null) { + for (Reference<?> reference : backup) { + changed(thread, reference, Mode.REMOVED); + } + } + return; + } + + Reference<?>[] current = (Reference<?>[]) tableField.get(threadLocals); + if (backup == null) { + for (Reference<?> reference : current) { + changed(thread, reference, Mode.ADDED); + } + } else { + // nested loop - both arrays *should* be relatively small + next: for (Reference<?> curRef : current) { + if (curRef != null) { + if (curRef.get() == copyOfThreadLocals || + curRef.get() == copyOfInheritableThreadLocals) { + continue next; + } + for (Reference<?> backupRef : backup) { + if (curRef == backupRef) + continue next; + } + // could not find it in backup - added + changed(thread, curRef, Mode.ADDED); + } + } + next: for (Reference<?> backupRef : backup) { + for (Reference<?> curRef : current) { + if (curRef == backupRef) + continue next; + } + // could not find it in current - removed + changed(thread, backupRef, Mode.REMOVED); + } + } + } catch (IllegalAccessException e) { + throw new IllegalStateException("Access denied", e); + } + } + + private void changed(Thread thread, Reference<?> reference, + ThreadLocalChangeListener.Mode mode) + throws IllegalAccessException { + listener.changed(mode, + thread, (ThreadLocal<?>) reference.get(), + threadLocalEntryValueField.get(reference)); + } + + /** @param c the class containing the field + * @param name the name of the field + * @return the field from the given class with the given name (made accessible) + * @throws NoSuchFieldException */ + private static Field field(Class<?> c, String name) + throws NoSuchFieldException { + Field field = c.getDeclaredField(name); + field.setAccessible(true); + return field; + } + + /** @param clazz the class containing the inner class + * @param name the name of the inner class + * @return the class with the given name, declared as inner class of the given class */ + private static Class<?> inner(Class<?> clazz, String name) { + for (Class<?> c : clazz.getDeclaredClasses()) { + if (c.getSimpleName().equals(name)) { + return c; + } + } + throw new IllegalStateException( + "Could not find inner class " + name + " in " + clazz); + } + + private static final ThreadLocal<Reference<?>[]> copyOfThreadLocals = new ThreadLocal<>(); + + private static final ThreadLocal<Reference<?>[]> copyOfInheritableThreadLocals = new ThreadLocal<>(); + + private static void saveOldThreadLocals() { + copyOfThreadLocals.set(copy(threadLocalsField)); + copyOfInheritableThreadLocals.set(copy(inheritableThreadLocalsField)); + } + + private static Reference<?>[] copy(Field field) { + try { + Thread thread = Thread.currentThread(); + Object threadLocals = field.get(thread); + if (threadLocals == null) + return null; + Reference<?>[] table = (Reference<?>[]) tableField.get(threadLocals); + return Arrays.copyOf(table, table.length); + } catch (IllegalAccessException e) { + throw new IllegalStateException("Access denied", e); + } + } + + private static void restoreOldThreadLocals() { + try { + restore(inheritableThreadLocalsField, copyOfInheritableThreadLocals.get()); + restore(threadLocalsField, copyOfThreadLocals.get()); + } finally { + copyOfThreadLocals.remove(); + copyOfInheritableThreadLocals.remove(); + } + } + + private static void restore(Field field, Object value) { + try { + Thread thread = Thread.currentThread(); + if (value == null) { + field.set(thread, null); + } else { + tableField.set(field.get(thread), value); + } + } catch (IllegalAccessException e) { + throw new IllegalStateException("Access denied", e); + } + } + + static { + // TODO: move to a place where the exception can be caught! + + } +} \ No newline at end of file Propchange: sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Copied: sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java (from r1790530, sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPool.java) URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java?p2=sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java&p1=sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPool.java&r1=1790530&r2=1791091&rev=1791091&view=diff ============================================================================== --- sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPool.java (original) +++ sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java Wed Apr 12 08:43:20 2017 @@ -16,131 +16,50 @@ */ package org.apache.sling.commons.threads.impl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import java.lang.invoke.MethodHandles; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -/** - * An extension of ThreadPoolExecutor, which keeps track of the age - * of the worker threads and expires them when they get older than - * a specified max-age. - * <br/> - * To be precise, a thread is expired when it finishes processing - * a task and its max-age has been exceeded at that time. I.e. if a - * thread is idle past its expiry, it may still process a single - * task before it is expired. - */ -public class ThreadExpiringThreadPool extends ThreadPoolExecutor { +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - private static final Logger LOG = LoggerFactory.getLogger(ThreadExpiringThreadPool.class); +/** + * An extension of ThreadPoolExecutor which automatically gets rid of all {@link ThreadLocal} references before a thread is going to be reused. + * @see ThreadLocalCleaner + */ +public class ThreadPoolExecutorCleaningThreadLocals extends ThreadPoolExecutor { + private final ThreadLocalChangeListener listener; - /** - * Map from thread-id to the time (in milliseconds) when a thread was first used to - * process a task. This is used to look determine when a thread is to be expired. - */ - private final ConcurrentHashMap<Long, Long> threadStartTimes; - - /** - * Thread max-age in milliseconds. - */ - private final long maxThreadAge; - - /** - * Convenience flag indicating whether threads expire or not. - * This is equivalent to {@code maxThreadAge >= 0}. - */ - private final boolean enableThreadExpiry; - - /** - * Marker exception object thrown to terminate threads that have - * reached or exceeded their max-age. This exception is intentionally - * used for (minimal) control flow, i.e. the {@code ThreadPoolExecutor} - * will dispose of any thread that threw an exception and create a new - * one in its stead. This exception should never show up in any logs, - * otherwise it is a bug. - */ - private final RuntimeException expiredThreadException; - - public ThreadExpiringThreadPool( - final int corePoolSize, - final int maximumPoolSize, - final long maxThreadAge, - final TimeUnit maxThreadAgeUnit, - final long keepAliveTime, - final TimeUnit keepAliveTimeUnit, - final BlockingQueue<Runnable> workQueue, - final ThreadFactory threadFactory, - final RejectedExecutionHandler handler - ) { - super(corePoolSize, maximumPoolSize, keepAliveTime, keepAliveTimeUnit, workQueue, threadFactory, handler); - this.threadStartTimes = new ConcurrentHashMap<Long, Long>(maximumPoolSize); - this.maxThreadAge = TimeUnit.MILLISECONDS.convert(maxThreadAge, maxThreadAgeUnit); - this.enableThreadExpiry = maxThreadAge >= 0; - this.expiredThreadException = new RuntimeException("Kill old thread"); - } + private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - @Override - protected void beforeExecute(final Thread thread, final Runnable runnable) { - if (enableThreadExpiry) { - recordStartTime(thread); - } - super.beforeExecute(thread, runnable); + public ThreadPoolExecutorCleaningThreadLocals(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue<Runnable> workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler, + ThreadLocalChangeListener listener) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, + workQueue, threadFactory, handler); + this.listener = listener; } - private void recordStartTime(final Thread thread) { - final long threadId = thread.getId(); - if (threadStartTimes.putIfAbsent(threadId, System.currentTimeMillis()) == null) { - LOG.debug("{} used for the first time.", thread); - - // The uncaught exception handler makes sure that the exception - // signalling the death of a thread is swallowed. All other - // Throwables are handed to the originalHandler. - final Thread.UncaughtExceptionHandler originalHandler = thread.getUncaughtExceptionHandler(); - thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread thread, final Throwable throwable) { - // first reset the original uncaught exception handler - just as a precaution - thread.setUncaughtExceptionHandler(originalHandler); - - // ignore expected exception thrown to terminate the thread - if (throwable == expiredThreadException) { - return; - } - - // delegate any other exceptions to the original uncaught exception handler - if (originalHandler != null) { - originalHandler.uncaughtException(thread, throwable); - } - } - }); - } - } + private static final ThreadLocal<ThreadLocalCleaner> local = new ThreadLocal<>(); - @Override - protected void afterExecute(final Runnable runnable, final Throwable throwable) { - super.afterExecute(runnable, throwable); - if (throwable == null && enableThreadExpiry) { - checkMaxThreadAge(Thread.currentThread()); - } + protected void beforeExecute(Thread t, Runnable r) { + LOGGER.debug("Collecting changes to ThreadLocal for thread {} from now on...", t); + ThreadLocalCleaner cleaner = new ThreadLocalCleaner(listener); + local.set(cleaner); } - private void checkMaxThreadAge(final Thread thread) { - final long now = System.currentTimeMillis(); - final long threadId = thread.getId(); - final Long started = threadStartTimes.get(threadId); - if (started != null && now >= started + maxThreadAge) { - final long delta = now - (started + maxThreadAge); - LOG.debug("{} exceeded its max age by {}ms and will be replaced.", thread, delta); - threadStartTimes.remove(threadId); - - // throw marker exception to kill this thread and thus trigger creation of a new one - throw expiredThreadException; - } + protected void afterExecute(Runnable r, Throwable t) { + LOGGER.debug("Cleaning up thread locals for thread {}...", Thread.currentThread()); + ThreadLocalCleaner cleaner = local.get(); + local.remove(); + cleaner.cleanup(); } } Modified: sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolMBeanImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolMBeanImpl.java?rev=1791091&r1=1791090&r2=1791091&view=diff ============================================================================== --- sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolMBeanImpl.java (original) +++ sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolMBeanImpl.java Wed Apr 12 08:43:20 2017 @@ -100,6 +100,7 @@ class ThreadPoolMBeanImpl extends Standa } } + @Deprecated public long getMaxThreadAge() { return this.entry.getConfig().getMaxThreadAge(); } Modified: sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/jmx/ThreadPoolMBean.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/jmx/ThreadPoolMBean.java?rev=1791091&r1=1791090&r2=1791091&view=diff ============================================================================== --- sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/jmx/ThreadPoolMBean.java (original) +++ sling/trunk/bundles/commons/threads/src/main/java/org/apache/sling/commons/threads/jmx/ThreadPoolMBean.java Wed Apr 12 08:43:20 2017 @@ -86,7 +86,10 @@ public interface ThreadPoolMBean { * Return the configured max thread age. * * @return The configured max thread age. + * @deprecated Since version 1.1.1 always returns -1 as threads are no longer retired + * but instead the thread locals are cleaned up (<a href="https://issues.apache.org/jira/browse/SLING-6261">SLING-6261</a>) */ + @Deprecated long getMaxThreadAge(); /** Copied: sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocalsTest.java (from r1790530, sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPoolTest.java) URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocalsTest.java?p2=sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocalsTest.java&p1=sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPoolTest.java&r1=1790530&r2=1791091&rev=1791091&view=diff ============================================================================== --- sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadExpiringThreadPoolTest.java (original) +++ sling/trunk/bundles/commons/threads/src/test/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocalsTest.java Wed Apr 12 08:43:20 2017 @@ -16,402 +16,77 @@ */ package org.apache.sling.commons.threads.impl; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExternalResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static java.util.Arrays.asList; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class ThreadExpiringThreadPoolTest { - - private static final Logger LOG = LoggerFactory.getLogger(ThreadExpiringThreadPoolTest.class); - - private static final int MAX_THREAD_AGE_MS = 90; // let threads expire after this many ms - - @Rule - public ThreadPoolContext context = new ThreadPoolContext(); - - - /** - * Attempts to isolate failures that happen > 0.2% of the time related to the - * way in which the underlying thread pool behaves. This is not normally run as - * a test , but use it if you want to isolate a rare failure. - */ - //@Test - public void shouldLetMultipleThreadsDieAfterExpiryMulti() { - int fail = 0; - int success = 0; - for (int i = 0; i < 500; i++) { - try { - LOG.info("Running {} ", i); - context = new ThreadPoolContext(); - context.before(); - shouldLetMultipleThreadsDieAfterExpiry(); - success++; - } catch (Throwable e) { - LOG.error("Failed ", e); - fail++; - fail("Race condition encountered"); - } finally { - context.after(); - } - } - LOG.info("Failed {} sucess {}", fail, success); - assertEquals(0, fail); - } - /** - * Attempts to isolate failures that happen > 0.2% of the time related to the - * way in which the underlying thread pool behaves. This is not normally run as - * a test, but use it if you want to isolate a rare failure. - */ - // @Test - public void shouldCreateNewThreadAfterExpiryMulti() { - - int fail = 0; - int success = 0; - for (int i = 0; i < 500; i++) { - try { - LOG.info("Running {} ", i); - context = new ThreadPoolContext(); - context.before(); - shouldCreateNewThreadAfterExpiry(); - success++; - } catch (Throwable e ) { - LOG.error("Failed ",e); - fail++; - fail("Race condition encountered"); - } finally { - context.after(); - } - } - LOG.info("Failed {} sucess {}", fail, success); - assertEquals(0, fail); - } - /** - * Attempts to isolate failures that happen > 0.2% of the time related to the - * way in which the underlying thread pool behaves. This is not normally run as - * a test, but use it if you want to isolate a rare failure. - */ - // @Test - public void shouldCreateNewThreadAfterExpiryForFailingTasksMulti() { - - int fail = 0; - int success = 0; - for (int i = 0; i < 500; i++) { - try { - LOG.info("Running {} ", i); - context = new ThreadPoolContext(); - context.before(); - shouldCreateNewThreadAfterExpiryForFailingTasks(); - success++; - } catch (Throwable e ) { - LOG.error("Failed ",e); - fail++; - fail("Race condition encountered"); - } finally { - context.after(); - } - } - LOG.info("Failed {} sucess {}", fail, success); - assertEquals(0, fail); - - } - - - @Test - public void shouldCreateNewThreadAfterExpiry() throws InterruptedException, ExecutionException { - final TrackingThreadFactory threadFactory = context.getThreadFactory(); - final ThreadExpiringThreadPool pool = context.getPool(); - - assertThat(threadFactory.getThreadCount(), is(0)); - - assertExecutionByThread(pool, "test-thread-0"); - assertExecutionByThread(pool, "test-thread-0"); - assertExecutionByThread(pool, "test-thread-0"); - assertThat(threadFactory.getThreadCount(), is(1)); - - letThreadsDie(); - - // thread executes one more task after expiring - assertExecutionByThread(pool, "test-thread-0"); - assertExecutionByThread(pool, "test-thread-1"); - assertThat(threadFactory.getThreadCount(), is(2)); - assertActiveThreads(threadFactory, "test-thread-1"); - assertExpiredThreads(threadFactory, "test-thread-0"); - } - - @Test - public void shouldCreateNewThreadAfterExpiryForFailingTasks() throws InterruptedException, ExecutionException { - final TrackingThreadFactory threadFactory = context.getThreadFactory(); - final ThreadExpiringThreadPool pool = context.getPool(); - - assertThat(threadFactory.getThreadCount(), is(0)); - - assertFailingSubmitThreadName(pool, "test-thread-0"); - assertFailingSubmitThreadName(pool, "test-thread-0"); - assertFailingSubmitThreadName(pool, "test-thread-0"); - assertThat(threadFactory.getThreadCount(), is(1)); - - letThreadsDie(); - - // thread executes one more task after expiring - assertFailingSubmitThreadName(pool, "test-thread-0"); - assertFailingSubmitThreadName(pool, "test-thread-1"); - assertThat(threadFactory.getThreadCount(), is(2)); - - assertActiveThreads(threadFactory, "test-thread-1"); - assertExpiredThreads(threadFactory, "test-thread-0"); +import org.apache.sling.commons.threads.impl.ThreadLocalChangeListener.Mode; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ThreadPoolExecutorCleaningThreadLocalsTest { + + public ThreadPoolExecutorCleaningThreadLocals pool; + + @Mock + public ThreadLocalChangeListener listener; + + + @Before + public void setUp() { + final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(20); + final RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy(); + pool = new ThreadPoolExecutorCleaningThreadLocals( + 1, 1, 100, TimeUnit.MILLISECONDS, + queue, Executors.defaultThreadFactory(), rejectionHandler, listener); } @Test - public void shouldLetMultipleThreadsDieAfterExpiry() - throws ExecutionException, InterruptedException { - - final TrackingThreadFactory threadFactory = context.getThreadFactory(); - final ThreadExpiringThreadPool pool = context.getPool(); - pool.setCorePoolSize(3); - pool.setMaximumPoolSize(3); - - assertParallelExecutionsByThread(pool, "test-thread-0", "test-thread-1", "test-thread-2"); - assertThat(threadFactory.getThreadCount(), is(3)); - - letThreadsDie(); - // thread executes one more task after expiring - executeParallelTasks(pool, 3); - - assertParallelExecutionsByThread(pool, "test-thread-3", "test-thread-4", "test-thread-5"); - assertThat(threadFactory.getThreadCount(), is(6)); - - assertActiveThreads(threadFactory, "test-thread-3", "test-thread-4", "test-thread-5"); - assertExpiredThreads(threadFactory, "test-thread-0", "test-thread-1", "test-thread-2"); - } - - private void assertActiveThreads(final TrackingThreadFactory factory, final String... names) { - assertThat("Active threads", factory.getActiveThreads(), equalTo(asSet(names))); - } - - private void assertExpiredThreads(final TrackingThreadFactory factory, final String... names) { - assertThat("Expired threads", factory.getExpiredThreads(), equalTo(asSet(names))); - } - - private Set<String> asSet(final String... items) { - return new HashSet<String>(asList(items)); - } - - private void assertParallelExecutionsByThread(final ExecutorService pool, final String... expectedThreads) - throws InterruptedException { - - final Task[] tasks = executeParallelTasks(pool, 3); - final List<String> threadNames = new ArrayList<String>(); - for (final Task task : tasks) { - threadNames.add(task.executedBy); - } - for (final String expectedThread : expectedThreads) { - assertTrue("No task was executed by " + expectedThread, - threadNames.remove(expectedThread)); - assertFalse("Multiple tasks were executed by " + expectedThread, - threadNames.contains(expectedThread)); - } - } - - private Task[] executeParallelTasks(final ExecutorService pool, final int number) - throws InterruptedException { - final Task[] tasks = new Task[number]; - final CountDownLatch latch = new CountDownLatch(number); - for (int i = 0; i < tasks.length; i++) { - tasks[i] = new Task(latch); - pool.execute(tasks[i]); - } - pool.awaitTermination(MAX_THREAD_AGE_MS, TimeUnit.MILLISECONDS); - return tasks; + public void testThreadLocalBeingCleanedUp() throws InterruptedException, ExecutionException { + assertTaskDoesNotSeeOldThreadLocals("test"); + assertTaskDoesNotSeeOldThreadLocals("test2"); + // verify mock interactions (at least the additions from the first task should be visible to the listener now) + Mockito.verify(listener).changed(Matchers.eq(Mode.ADDED), Matchers.any(Thread.class), Matchers.eq(ThreadLocalTask.threadLocalVariable), Matchers.eq("test")); + // no thread locals should have been removed + Mockito.verify(listener, Mockito.times(0)).changed(Matchers.eq(Mode.REMOVED), Matchers.any(Thread.class), Matchers.eq(ThreadLocalTask.threadLocalVariable), Matchers.anyString()); } - private void assertExecutionByThread(final ExecutorService pool, final String expectedThread) - throws ExecutionException, InterruptedException { - final Task task = new Task(); + private void assertTaskDoesNotSeeOldThreadLocals(String value) throws InterruptedException, ExecutionException { + ThreadLocalTask task = new ThreadLocalTask(value); pool.submit(task).get(); - assertEquals("Thread name", expectedThread, task.executedBy); + Assert.assertNull(task.getOldValue()); } - private void assertFailingSubmitThreadName(final ExecutorService pool, final String expectedThread) - throws ExecutionException, InterruptedException { - final Task task = new ExceptionTask(); - try { - pool.submit(task).get(); - } catch (ExecutionException e) { - if (!e.getCause().getMessage().startsWith("ExceptionTask #")) { - LOG.error("Unexpected exception: ", e); - fail("Unexpected exception: " + e.getMessage()); - } - } - assertEquals("Thread name", expectedThread, task.executedBy); - } + private static class ThreadLocalTask implements Runnable { + static final ThreadLocal<String> threadLocalVariable = new ThreadLocal<String>(); - private void letThreadsDie() throws InterruptedException { - TimeUnit.MILLISECONDS.sleep(MAX_THREAD_AGE_MS * 2); - } + private final String newValue; + private volatile String oldValue; - private static class Task implements Runnable { - - private static int counter = 0; - - protected final int count; - - private final CountDownLatch mayFinish; - - protected String executedBy; - - Task() { - this(new CountDownLatch(0)); + public ThreadLocalTask(String newValue) { + this.newValue = newValue; } - Task(final CountDownLatch latch) { - this.mayFinish = latch; - this.count = counter++; - } - - @Override - public void run() { - mayFinish.countDown(); - final Thread thread = Thread.currentThread(); - try { - mayFinish.await(); - } catch (InterruptedException e) { - thread.interrupt(); - } - LOG.info("{} #{} running in thread {}", - new Object[] {getClass().getSimpleName(), count, thread}); - executedBy = thread.getName(); - } - } - - private static class ExceptionTask extends Task { @Override public void run() { - super.run(); - throw new RuntimeException("ExceptionTask #" + count); + oldValue = threadLocalVariable.get(); + // set thread local to a new value + threadLocalVariable.set(newValue); } - } - - private static class TrackingThreadFactory implements ThreadFactory { - - private final ThreadGroup group; - private final AtomicInteger threadCount = new AtomicInteger(0); - - private final List<Thread> threadHistory = new CopyOnWriteArrayList<Thread>(); - - public TrackingThreadFactory() { - group = Thread.currentThread().getThreadGroup(); - } - - public int getThreadCount() { - return threadHistory.size(); - } - - public Set<String> getActiveThreads() { - letThreadsDie(); - final HashSet<String> active = new HashSet<String>(); - for (final Thread thread : threadHistory) { - if (thread.isAlive()) { - active.add(thread.getName()); - } - } - return active; - } - - public Set<String> getExpiredThreads() { - letThreadsDie(); - final HashSet<String> expired = new HashSet<String>(); - for (final Thread thread : threadHistory) { - if (!thread.isAlive()) { - expired.add(thread.getName()); - } - } - return expired; - } - - /** - * This avoids a race condition where a thread has been evicted from the pool but is still alive becuase it evicted itself. - * JDK8 java.util.concurrent.ThreadPoolExecutor does this. The 15ms assumes the process takes no more than 15ms to complete. - * That is OS and VM dependent. - */ - public void letThreadsDie() { - try { - Thread.sleep(15); - } catch ( Exception e) { - LOG.debug(e.getMessage(),e); - } - } - - @Override - public Thread newThread(final Runnable r) { - final Thread thread = new Thread(group, r, "test-thread-" + threadCount.getAndIncrement()); - thread.setDaemon(false); - thread.setPriority(Thread.NORM_PRIORITY); - threadHistory.add(thread); - LOG.info("Created thread {}", thread.getName()); - return thread; - } - } - - public static class ThreadPoolContext extends ExternalResource { - - public TrackingThreadFactory getThreadFactory() { - return threadFactory; - } - - public ThreadExpiringThreadPool getPool() { - return pool; - } - - private TrackingThreadFactory threadFactory; - - private ThreadExpiringThreadPool pool; - - @Override - protected void before() throws Throwable { - Task.counter = 0; // reset counter - final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(20); - final RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy(); - threadFactory = new TrackingThreadFactory(); - pool = new ThreadExpiringThreadPool( - 1, 1, - MAX_THREAD_AGE_MS, TimeUnit.MILLISECONDS, - 1000, TimeUnit.MILLISECONDS, - queue, threadFactory, rejectionHandler); - } - - @Override - protected void after() { - threadFactory = null; - pool = null; + public String getOldValue() { + return oldValue; } } }