This is an automated email from the ASF dual-hosted git repository. rombert pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-commons-threads.git
commit 69c0e9ff0dcbb6e4e24ced6fdde4919816169a81 Author: Robert Munteanu <[email protected]> AuthorDate: Wed Jan 24 23:11:54 2018 +0200 SLING-7432 - Thread pool clean up code can lead to infinite loops in ThreadLocal.get Stop using thread locals, as they might interfere with the save/restore logic. Got no more hangs in ThreadLocal code after this change. --- .../commons/threads/impl/ThreadLocalCleaner.java | 220 ++++++++++++--------- .../ThreadPoolExecutorCleaningThreadLocals.java | 16 +- 2 files changed, 137 insertions(+), 99 deletions(-) diff --git a/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java b/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java index b755034..5a85f72 100644 --- a/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java +++ b/src/main/java/org/apache/sling/commons/threads/impl/ThreadLocalCleaner.java @@ -21,12 +21,17 @@ import java.lang.reflect.Field; import java.util.Arrays; import org.apache.sling.commons.threads.impl.ThreadLocalChangeListener.Mode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Notifies a {@link ThreadLocalChangeListener} about changes on a thread local storage. In addition it removes all references to variables * being added to the thread local storage while the cleaner was running with its {@link cleanup} method. * * @see <a href="http://www.javaspecialists.eu/archive/Issue229.html">JavaSpecialist.eu - Cleaning ThreadLocals</a> */ public class ThreadLocalCleaner { + + private static final Logger LOG = LoggerFactory.getLogger(ThreadLocalCleaner.class); + private final ThreadLocalChangeListener listener; /* Reflection fields */ @@ -47,15 +52,6 @@ public class ThreadLocalCleaner { private static Field threadLocalMapThresholdField; private static volatile IllegalStateException reflectionException; - - public ThreadLocalCleaner(ThreadLocalChangeListener listener) { - if (threadLocalsField == null || reflectionException != null) { - initReflectionFields(); - } - this.listener = listener; - saveOldThreadLocals(); - } - private static synchronized void initReflectionFields() throws IllegalStateException { // check if previous initialization lead to an exception if (reflectionException != null) { @@ -80,11 +76,89 @@ public class ThreadLocalCleaner { } } } + + /** @param c the class containing the field + * @param name the name of the field + * @return the field from the given class with the given name (made accessible) + * @throws NoSuchFieldException */ + private static Field field(Class<?> c, String name) + throws NoSuchFieldException { + Field field = c.getDeclaredField(name); + field.setAccessible(true); + return field; + } + + /** @param clazz the class containing the inner class + * @param name the name of the inner class + * @return the class with the given name, declared as inner class of the given class */ + private static Class<?> inner(Class<?> clazz, String name) { + for (Class<?> c : clazz.getDeclaredClasses()) { + if (c.getSimpleName().equals(name)) { + return c; + } + } + throw new IllegalStateException( + "Could not find inner class " + name + " in " + clazz); + } + + private static Reference<?>[] copy(Field field) { + try { + Thread thread = Thread.currentThread(); + Object threadLocals = field.get(thread); + if (threadLocals == null) + return null; + Reference<?>[] table = (Reference<?>[]) tableField.get(threadLocals); + return Arrays.copyOf(table, table.length); + } catch (IllegalAccessException e) { + throw new IllegalStateException("Access denied", e); + } + } + + private static Integer size(Field field, Field sizeField) { + try { + Thread thread = Thread.currentThread(); + Object threadLocals = field.get(thread); + if (threadLocals == null) + return null; + return (Integer) sizeField.get(threadLocals); + } catch (IllegalAccessException e) { + throw new IllegalStateException("Access denied", e); + } + } + + private ThreadLocalMapCopy threadLocalsCopy; + private ThreadLocalMapCopy inheritableThreadLocalsCopy; + + private static void restore(Field field, Object[] value, Integer size, Integer threshold) { + try { + Thread thread = Thread.currentThread(); + if (value == null) { + field.set(thread, null); + LOG.debug("Restored {} to a null value", field.getName()); + } else { + final Object threadLocals = field.get(thread); + tableField.set(threadLocals, value); + threadLocalMapSizeField.set(threadLocals, size); + threadLocalMapThresholdField.set(threadLocals, threshold); + LOG.debug("Restored {} with to {} references, size {}, threshold {}" ,field.getName(), value.length, size, threshold); + } + } catch (IllegalAccessException e) { + throw new IllegalStateException("Access denied", e); + } + } + + public ThreadLocalCleaner(ThreadLocalChangeListener listener) { + if (threadLocalsField == null || reflectionException != null) { + initReflectionFields(); + } + this.listener = listener; + saveOldThreadLocals(); + } public void cleanup() { // the first two diff calls are only to notify the listener, the actual cleanup is done by restoreOldThreadLocals - diff(threadLocalsField, copyOfThreadLocals.get()); - diff(inheritableThreadLocalsField, copyOfInheritableThreadLocals.get()); + diff(threadLocalsField, threadLocalsCopy.references); + diff(inheritableThreadLocalsField, inheritableThreadLocalsCopy.references); restoreOldThreadLocals(); } @@ -114,8 +188,8 @@ public class ThreadLocalCleaner { // nested loop - both arrays *should* be relatively small next: for (Reference<?> curRef : current) { if (curRef != null) { - if (curRef.get() == copyOfThreadLocals || - curRef.get() == copyOfInheritableThreadLocals) { + if (curRef.get() == this.threadLocalsCopy || + curRef.get() == this.inheritableThreadLocalsCopy) { continue next; } for (Reference<?> backupRef : backup) { @@ -151,96 +225,54 @@ public class ThreadLocalCleaner { } } - /** @param c the class containing the field - * @param name the name of the field - * @return the field from the given class with the given name (made accessible) - * @throws NoSuchFieldException */ - private static Field field(Class<?> c, String name) - throws NoSuchFieldException { - Field field = c.getDeclaredField(name); - field.setAccessible(true); - return field; + private void saveOldThreadLocals() { + + threadLocalsCopy = new ThreadLocalMapCopy(copy(threadLocalsField), + size(threadLocalsField, threadLocalMapSizeField), + size(threadLocalsField, threadLocalMapThresholdField)); + threadLocalsCopy.debug("saved", "Thread locals"); + + inheritableThreadLocalsCopy = new ThreadLocalMapCopy(copy(inheritableThreadLocalsField), + size(inheritableThreadLocalsField, threadLocalMapSizeField), + size(inheritableThreadLocalsField, threadLocalMapThresholdField)); + inheritableThreadLocalsCopy.debug("saved", "Inheritable thread locals"); } - /** @param clazz the class containing the inner class - * @param name the name of the inner class - * @return the class with the given name, declared as inner class of the given class */ - private static Class<?> inner(Class<?> clazz, String name) { - for (Class<?> c : clazz.getDeclaredClasses()) { - if (c.getSimpleName().equals(name)) { - return c; - } - } - throw new IllegalStateException( - "Could not find inner class " + name + " in " + clazz); - } - - private static final ThreadLocal<Reference<?>[]> copyOfThreadLocals = new ThreadLocal<>(); - private static final ThreadLocal<Integer> copyOfThreadLocalsSize = new ThreadLocal<>(); - private static final ThreadLocal<Integer> copyOfThreadLocalsThreshold = new ThreadLocal<>(); - private static final ThreadLocal<Reference<?>[]> copyOfInheritableThreadLocals = new ThreadLocal<>(); - private static final ThreadLocal<Integer> copyOfInheritableThreadLocalsSize = new ThreadLocal<>(); - private static final ThreadLocal<Integer> copyOfInheritableThreadLocalsThreshold = new ThreadLocal<>(); - - private static void saveOldThreadLocals() { - copyOfThreadLocals.set(copy(threadLocalsField)); - copyOfThreadLocalsSize.set(size(threadLocalsField, threadLocalMapSizeField)); - copyOfThreadLocalsThreshold.set(size(threadLocalsField, threadLocalMapThresholdField)); - copyOfInheritableThreadLocals.set(copy(inheritableThreadLocalsField)); - copyOfInheritableThreadLocalsSize.set(size(inheritableThreadLocalsField, threadLocalMapSizeField)); - copyOfInheritableThreadLocalsThreshold.set(size(inheritableThreadLocalsField, threadLocalMapThresholdField)); - } - - private static Reference<?>[] copy(Field field) { + private void restoreOldThreadLocals() { try { - Thread thread = Thread.currentThread(); - Object threadLocals = field.get(thread); - if (threadLocals == null) - return null; - Reference<?>[] table = (Reference<?>[]) tableField.get(threadLocals); - return Arrays.copyOf(table, table.length); - } catch (IllegalAccessException e) { - throw new IllegalStateException("Access denied", e); - } - } - - private static Integer size(Field field, Field sizeField) { - try { - Thread thread = Thread.currentThread(); - Object threadLocals = field.get(thread); - if (threadLocals == null) - return null; - return (Integer) sizeField.get(threadLocals); - } catch (IllegalAccessException e) { - throw new IllegalStateException("Access denied", e); - } - } - - private static void restoreOldThreadLocals() { - try { - restore(inheritableThreadLocalsField, copyOfInheritableThreadLocals.get(), - copyOfInheritableThreadLocalsSize.get(), copyOfInheritableThreadLocalsThreshold.get()); - restore(threadLocalsField, copyOfThreadLocals.get(), - copyOfThreadLocalsSize.get(), copyOfThreadLocalsThreshold.get()); + restore(inheritableThreadLocalsField, inheritableThreadLocalsCopy.references, + inheritableThreadLocalsCopy.size, inheritableThreadLocalsCopy.threshold); + restore(threadLocalsField, threadLocalsCopy.references, + threadLocalsCopy.size, threadLocalsCopy.threshold); } finally { - copyOfThreadLocals.remove(); - copyOfInheritableThreadLocals.remove(); + threadLocalsCopy = null; + inheritableThreadLocalsCopy = null; } } - private static void restore(Field field, Object value, Integer size, Integer threshold) { - try { - Thread thread = Thread.currentThread(); - if (value == null) { - field.set(thread, null); + /** + * Helper class that encapsulates the state from a <tt>ThreadLocalMap</tt> + * + */ + static class ThreadLocalMapCopy { + + private final Reference<?>[] references; + private final Integer size; + private final Integer threshold; + + private ThreadLocalMapCopy(Reference<?>[] references, Integer size, Integer threshold) { + this.references = references; + this.size = size; + this.threshold = threshold; + } + + void debug(String event, String mapName) { + if ( references != null ) { + ThreadLocalCleaner.LOG.debug("{}: {} {} references, size: {}, threshold: {}", + mapName, event, references.length, size, threshold); } else { - final Object threadLocals = field.get(thread); - tableField.set(threadLocals, value); - threadLocalMapSizeField.set(threadLocals, size); - threadLocalMapThresholdField.set(threadLocals, threshold); + ThreadLocalCleaner.LOG.debug("{}: {} null references", mapName, event); } - } catch (IllegalAccessException e) { - throw new IllegalStateException("Access denied", e); } } } \ No newline at end of file diff --git a/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java b/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java index 922f18b..8f7a9a6 100644 --- a/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java +++ b/src/main/java/org/apache/sling/commons/threads/impl/ThreadPoolExecutorCleaningThreadLocals.java @@ -18,6 +18,8 @@ package org.apache.sling.commons.threads.impl; import java.lang.invoke.MethodHandles; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -35,6 +37,8 @@ public class ThreadPoolExecutorCleaningThreadLocals extends ThreadPoolExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final ConcurrentMap<Thread, ThreadLocalCleaner> cleaners = new ConcurrentHashMap<>(); + public ThreadPoolExecutorCleaningThreadLocals(int corePoolSize, int maximumPoolSize, long keepAliveTime, @@ -48,26 +52,28 @@ public class ThreadPoolExecutorCleaningThreadLocals extends ThreadPoolExecutor { this.listener = listener; } - private static final ThreadLocal<ThreadLocalCleaner> local = new ThreadLocal<>(); - protected void beforeExecute(Thread t, Runnable r) { LOGGER.debug("Collecting changes to ThreadLocal for thread {} from now on...", t); try { ThreadLocalCleaner cleaner = new ThreadLocalCleaner(listener); - local.set(cleaner); + cleaners.put(t, cleaner); } catch (Throwable e) { LOGGER.warn("Could not set up thread local cleaner (most probably not a compliant JRE): {}", e, e); } + + super.beforeExecute(t, r); } protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + LOGGER.debug("Cleaning up thread locals for thread {}...", Thread.currentThread()); - ThreadLocalCleaner cleaner = local.get(); + ThreadLocalCleaner cleaner = cleaners.remove(Thread.currentThread()); + if (cleaner != null) { cleaner.cleanup(); } else { LOGGER.warn("Could not clean up thread locals in thread {} as the cleaner was not set up correctly", Thread.currentThread()); } - local.remove(); } } -- To stop receiving notification emails like this one, please contact [email protected].
