This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 9.0.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/9.0.x by this push: new 9c801e41e8 Switch to Executor for async logging 9c801e41e8 is described below commit 9c801e41e85df0ea9a1ceb3108b4125d568c5f82 Author: Mark Thomas <ma...@apache.org> AuthorDate: Thu Aug 25 14:29:19 2022 +0100 Switch to Executor for async logging Replace logging thread for JULI's AsyncFileHandlerwith an executor to protect against failure of the logging thread. Based on pull request #545 by Piotr P. Karwasz. --- java/org/apache/juli/AsyncFileHandler.java | 172 +++++++++++---------- java/org/apache/juli/FileHandler.java | 86 +++++------ java/org/apache/juli/OneLineFormatter.java | 5 +- .../apache/juli/TestAsyncFileHandlerOverflow.java | 143 +++++++++++++++++ webapps/docs/changelog.xml | 5 + 5 files changed, 278 insertions(+), 133 deletions(-) diff --git a/java/org/apache/juli/AsyncFileHandler.java b/java/org/apache/juli/AsyncFileHandler.java index f76291c93c..d180dc738b 100644 --- a/java/org/apache/juli/AsyncFileHandler.java +++ b/java/org/apache/juli/AsyncFileHandler.java @@ -17,9 +17,13 @@ package org.apache.juli; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.LogRecord; + /** * A {@link FileHandler} implementation that uses a queue of log entries. * @@ -39,6 +43,8 @@ import java.util.logging.LogRecord; */ public class AsyncFileHandler extends FileHandler { + static final String THREAD_PREFIX = "AsyncFileHandlerWriter-"; + public static final int OVERFLOW_DROP_LAST = 1; public static final int OVERFLOW_DROP_FIRST = 2; public static final int OVERFLOW_DROP_FLUSH = 3; @@ -54,17 +60,12 @@ public class AsyncFileHandler extends FileHandler { System.getProperty("org.apache.juli.AsyncMaxRecordCount", Integer.toString(DEFAULT_MAX_RECORDS))); - protected static final LinkedBlockingDeque<LogEntry> queue = - new LinkedBlockingDeque<>(MAX_RECORDS); - - protected static final LoggerThread logger = new LoggerThread(); - - static { - logger.start(); - } + private static final LoggerExecutorService LOGGER_SERVICE = + new LoggerExecutorService(OVERFLOW_DROP_TYPE, MAX_RECORDS); private final Object closeLock = new Object(); protected volatile boolean closed = false; + private final LoggerExecutorService loggerService; public AsyncFileHandler() { this(null, null, null); @@ -75,7 +76,13 @@ public class AsyncFileHandler extends FileHandler { } public AsyncFileHandler(String directory, String prefix, String suffix, Integer maxDays) { + this(directory, prefix, suffix, maxDays, LOGGER_SERVICE); + } + + AsyncFileHandler(String directory, String prefix, String suffix, Integer maxDays, + LoggerExecutorService loggerService) { super(directory, prefix, suffix, maxDays); + this.loggerService = loggerService; open(); } @@ -90,7 +97,7 @@ public class AsyncFileHandler extends FileHandler { } closed = true; } - LoggerThread.deregisterHandler(); + loggerService.deregisterHandler(); super.close(); } @@ -105,11 +112,10 @@ public class AsyncFileHandler extends FileHandler { } closed = false; } - LoggerThread.registerHandler(); + loggerService.registerHandler(); super.open(); } - @Override public void publish(LogRecord record) { if (!isLoggable(record)) { @@ -118,58 +124,68 @@ public class AsyncFileHandler extends FileHandler { // fill source entries, before we hand the record over to another // thread with another class loader record.getSourceMethodName(); - LogEntry entry = new LogEntry(record, this); - boolean added = false; - try { - while (!added && !queue.offer(entry)) { - switch (OVERFLOW_DROP_TYPE) { - case OVERFLOW_DROP_LAST: { - //remove the last added element - queue.pollLast(); - break; - } - case OVERFLOW_DROP_FIRST: { - //remove the first element in the queue - queue.pollFirst(); - break; - } - case OVERFLOW_DROP_FLUSH: { - added = queue.offer(entry, 1000, TimeUnit.MILLISECONDS); - break; - } - case OVERFLOW_DROP_CURRENT: { - added = true; - break; - } - }//switch - }//while - } catch (InterruptedException x) { - // Allow thread to be interrupted and back out of the publish - // operation. No further action required. - } - + loggerService.execute(new Runnable() { + + @Override + public void run() { + /* + * During Tomcat shutdown, the Handlers are closed before the + * executor queue is flushed therefore the closed flag is + * ignored if the executor is shutting down. + */ + if (!closed || loggerService.isTerminating()) { + publishInternal(record); + } + } + }); } protected void publishInternal(LogRecord record) { super.publish(record); } - protected static class LoggerThread extends Thread { + + static class LoggerExecutorService extends ThreadPoolExecutor { + + private static final ThreadFactory THREAD_FACTORY = new ThreadFactory(THREAD_PREFIX); /* * Implementation note: Use of this count could be extended to - * start/stop the LoggerThread but that would require careful locking as - * the current size of the queue also needs to be taken into account and - * there are lost of edge cases when rapidly starting and stopping - * handlers. + * start/stop the LoggerExecutorService but that would require careful + * locking as the current size of the queue also needs to be taken into + * account and there are lost of edge cases when rapidly starting and + * stopping handlers. */ - private static final AtomicInteger handlerCount = new AtomicInteger(); + private final AtomicInteger handlerCount = new AtomicInteger(); + + public LoggerExecutorService(final int overflowDropType, final int maxRecords) { + super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(maxRecords), THREAD_FACTORY); + switch (overflowDropType) { + case OVERFLOW_DROP_LAST: + default: + setRejectedExecutionHandler(new DropLastPolicy()); + break; + case OVERFLOW_DROP_FIRST: + setRejectedExecutionHandler(new DiscardOldestPolicy()); + break; + case OVERFLOW_DROP_FLUSH: + setRejectedExecutionHandler(new DropFlushPolicy()); + break; + case OVERFLOW_DROP_CURRENT: + setRejectedExecutionHandler(new DiscardPolicy()); + } + } + + @Override + public LinkedBlockingDeque<Runnable> getQueue() { + return (LinkedBlockingDeque<Runnable>) super.getQueue(); + } - public static void registerHandler() { + public void registerHandler() { handlerCount.incrementAndGet(); } - public static void deregisterHandler() { + public void deregisterHandler() { int newCount = handlerCount.decrementAndGet(); if (newCount == 0) { try { @@ -179,54 +195,46 @@ public class AsyncFileHandler extends FileHandler { } catch (IllegalStateException ise) { // JVM is shutting down. // Allow up to 10s for for the queue to be emptied - int sleepCount = 0; - while (!AsyncFileHandler.queue.isEmpty() && sleepCount < 10000) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - // Ignore - } - sleepCount++; + shutdown(); + try { + awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Ignore } + shutdownNow(); } } } + } - public LoggerThread() { - this.setDaemon(true); - this.setName("AsyncFileHandlerWriter-" + System.identityHashCode(this)); - } + + private static class DropFlushPolicy implements RejectedExecutionHandler { @Override - public void run() { + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { while (true) { + if (executor.isShutdown()) { + break; + } try { - LogEntry entry = queue.take(); - entry.flush(); - } catch (InterruptedException x) { - // Ignore the attempt to interrupt the thread. - } catch (Exception x) { - x.printStackTrace(); + if (executor.getQueue().offer(r, 1000, TimeUnit.MILLISECONDS)) { + break; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RejectedExecutionException("Interrupted", e); } } } } - protected static class LogEntry { - private final LogRecord record; - private final AsyncFileHandler handler; - public LogEntry(LogRecord record, AsyncFileHandler handler) { - super(); - this.record = record; - this.handler = handler; - } + private static class DropLastPolicy implements RejectedExecutionHandler { - public boolean flush() { - if (handler.closed) { - return false; - } else { - handler.publishInternal(record); - return true; + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (!executor.isShutdown()) { + ((LoggerExecutorService) executor).getQueue().pollLast(); + executor.execute(r); } } } diff --git a/java/org/apache/juli/FileHandler.java b/java/org/apache/juli/FileHandler.java index 974756aba1..43114597a5 100644 --- a/java/org/apache/juli/FileHandler.java +++ b/java/org/apache/juli/FileHandler.java @@ -37,7 +37,6 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -95,59 +94,13 @@ import java.util.regex.Pattern; */ public class FileHandler extends Handler { + public static final int DEFAULT_MAX_DAYS = -1; public static final int DEFAULT_BUFFER_SIZE = -1; private static final ExecutorService DELETE_FILES_SERVICE = - Executors.newSingleThreadExecutor(new ThreadFactory() { - private static final String NAME_PREFIX = "FileHandlerLogFilesCleaner-"; - private final boolean isSecurityEnabled; - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); - - { - SecurityManager s = System.getSecurityManager(); - if (s == null) { - this.isSecurityEnabled = false; - this.group = Thread.currentThread().getThreadGroup(); - } else { - this.isSecurityEnabled = true; - this.group = s.getThreadGroup(); - } - } - - @Override - public Thread newThread(Runnable r) { - ClassLoader loader = Thread.currentThread().getContextClassLoader(); - try { - // Threads should not be created by the webapp classloader - if (isSecurityEnabled) { - AccessController.doPrivileged((PrivilegedAction<Void>) () -> { - Thread.currentThread() - .setContextClassLoader(getClass().getClassLoader()); - return null; - }); - } else { - Thread.currentThread() - .setContextClassLoader(getClass().getClassLoader()); - } - Thread t = new Thread(group, r, - NAME_PREFIX + threadNumber.getAndIncrement()); - t.setDaemon(true); - return t; - } finally { - if (isSecurityEnabled) { - AccessController.doPrivileged((PrivilegedAction<Void>) () -> { - Thread.currentThread().setContextClassLoader(loader); - return null; - }); - } else { - Thread.currentThread().setContextClassLoader(loader); - } - } - } - }); + Executors.newSingleThreadExecutor(new ThreadFactory("FileHandlerLogFilesCleaner-")); // ------------------------------------------------------------ Constructor @@ -587,4 +540,39 @@ public class FileHandler extends Handler { return null; } } + + protected static final class ThreadFactory implements java.util.concurrent.ThreadFactory { + private final String namePrefix; + private final boolean isSecurityEnabled; + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + + public ThreadFactory(final String namePrefix) { + this.namePrefix = namePrefix; + SecurityManager s = System.getSecurityManager(); + if (s == null) { + this.isSecurityEnabled = false; + this.group = Thread.currentThread().getThreadGroup(); + } else { + this.isSecurityEnabled = true; + this.group = s.getThreadGroup(); + } + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement()); + // Threads should not have as context classloader a webapp classloader + if (isSecurityEnabled) { + AccessController.doPrivileged((PrivilegedAction<Void>) () -> { + t.setContextClassLoader(ThreadFactory.class.getClassLoader()); + return null; + }); + } else { + t.setContextClassLoader(ThreadFactory.class.getClassLoader()); + } + t.setDaemon(true); + return t; + } + } } diff --git a/java/org/apache/juli/OneLineFormatter.java b/java/org/apache/juli/OneLineFormatter.java index 7419d8b4c8..374e9119c5 100644 --- a/java/org/apache/juli/OneLineFormatter.java +++ b/java/org/apache/juli/OneLineFormatter.java @@ -132,12 +132,13 @@ public class OneLineFormatter extends Formatter { // Thread sb.append(' '); sb.append('['); - if (Thread.currentThread() instanceof AsyncFileHandler.LoggerThread) { + final String threadName = Thread.currentThread().getName(); + if (threadName != null && threadName.startsWith(AsyncFileHandler.THREAD_PREFIX)) { // If using the async handler can't get the thread name from the // current thread. sb.append(getThreadName(record.getThreadID())); } else { - sb.append(Thread.currentThread().getName()); + sb.append(threadName); } sb.append(']'); diff --git a/test/org/apache/juli/TestAsyncFileHandlerOverflow.java b/test/org/apache/juli/TestAsyncFileHandlerOverflow.java new file mode 100644 index 0000000000..bf67dc6ddc --- /dev/null +++ b/test/org/apache/juli/TestAsyncFileHandlerOverflow.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.juli; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.LocalDate; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Formatter; +import java.util.logging.LogRecord; +import java.util.logging.Logger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import org.apache.juli.AsyncFileHandler.LoggerExecutorService; + +@RunWith(Parameterized.class) +public class TestAsyncFileHandlerOverflow { + + private static final String PREFIX = "TestAsyncFileHandler."; + private static final String SUFFIX = ".log"; + private static final Logger logger = Logger.getLogger(TestAsyncFileHandlerOverflow.class.getName()); + { + logger.setUseParentHandlers(false); + } + + @Parameters + public static Collection<Object[]> parameters() { + return Arrays.asList(new Object[][] { + { Integer.valueOf(AsyncFileHandler.OVERFLOW_DROP_LAST), "START\n1\n3\n" }, + { Integer.valueOf(AsyncFileHandler.OVERFLOW_DROP_FIRST), "START\n2\n3\n" }, + { Integer.valueOf(AsyncFileHandler.OVERFLOW_DROP_FLUSH), "START\n1\n2\n3\n" }, + { Integer.valueOf(AsyncFileHandler.OVERFLOW_DROP_CURRENT), "START\n1\n2\n" } }); + } + + private final CountDownLatch latch = new CountDownLatch(1); + private Path logsDir; + private LoggerExecutorService loggerService; + private AsyncFileHandler handler; + + private final int overflowDropType; + private final String expected; + + public TestAsyncFileHandlerOverflow(final int overflowDropType, final String expected) { + this.overflowDropType = overflowDropType; + this.expected = expected; + } + + @Before + public void setUp() throws IOException { + final Path logsBase = Paths.get(System.getProperty("tomcat.test.temp", "output/tmp")); + Files.createDirectories(logsBase); + this.logsDir = Files.createTempDirectory(logsBase, "test"); + final Formatter formatter = new Formatter() { + + @Override + public String format(LogRecord record) { + return record.getMessage() + "\n"; + } + }; + // Setup an executor that blocks until the first rejection + this.loggerService = new LoggerExecutorService(overflowDropType, 2) { + + @Override + protected void beforeExecute(Thread t, Runnable r) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + super.beforeExecute(t, r); + } + }; + final RejectedExecutionHandler rejectionHandler = loggerService.getRejectedExecutionHandler(); + loggerService.setRejectedExecutionHandler(new RejectedExecutionHandler() { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + latch.countDown(); + rejectionHandler.rejectedExecution(r, executor); + } + }); + this.handler = new AsyncFileHandler(logsDir.toString(), PREFIX, SUFFIX, Integer.valueOf(1), loggerService); + handler.setFormatter(formatter); + logger.addHandler(handler); + handler.open(); + } + + @After + public void cleanUp() { + handler.close(); + logger.removeHandler(handler); + } + + @Test + public void testOverFlow() throws IOException, InterruptedException { + handler.open(); + logger.warning("START"); // blocks async thread + // these are queued + logger.warning("1"); + logger.warning("2"); + logger.warning("3"); // overflows executor and unblocks aync thread + loggerService.shutdown(); + // after shutdown was issued + logger.warning("IGNORE"); + + loggerService.awaitTermination(1, TimeUnit.SECONDS); + final ByteArrayOutputStream os = new ByteArrayOutputStream(); + Files.copy(logsDir.resolve(PREFIX + LocalDate.now() + SUFFIX), os); + final String actual = new String(os.toByteArray(), StandardCharsets.UTF_8); + Assert.assertEquals(expected, actual); + handler.close(); + } +} \ No newline at end of file diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 09dc588eb4..a774dfef2b 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -158,6 +158,11 @@ contain the duplicates. Based on pull request <pr>535</pr> by Mads Rolsdorph. (markt) </fix> + <fix> + Replace logging thread for JULI's <code>AsyncFileHandler</code> with an + executor to protect against failure of the logging thread. Based on pull + request <pr>545</pr> by Piotr P. Karwasz. (markt) + </fix> </changelog> </subsection> <subsection name="Coyote"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org