This is an automated email from the ASF dual-hosted git repository. vy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git
commit 6bf68b73b322b9b52e2ff7d969133fb816f57d38 Author: Volkan Yazıcı <[email protected]> AuthorDate: Sat Jan 9 15:58:29 2021 +0100 LOG4J2-2972 Refactor AsyncAppender and AppenderControl for handling of Throwables. (#452) --- .../org/apache/logging/log4j/util/SneakyThrow.java | 13 -- .../logging/log4j/core/appender/AsyncAppender.java | 136 ++-------------- .../appender/AsyncAppenderEventDispatcher.java | 174 +++++++++++++++++++++ .../logging/log4j/core/config/AppenderControl.java | 2 +- .../async/AsyncAppenderExceptionHandlingTest.java | 9 +- .../log4j/test/appender/FailOnceAppender.java | 15 +- 6 files changed, 209 insertions(+), 140 deletions(-) diff --git a/log4j-api/src/test/java/org/apache/logging/log4j/util/SneakyThrow.java b/log4j-api/src/test/java/org/apache/logging/log4j/util/SneakyThrow.java deleted file mode 100644 index af26e6e..0000000 --- a/log4j-api/src/test/java/org/apache/logging/log4j/util/SneakyThrow.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.apache.logging.log4j.util; - -public enum SneakyThrow {; - - /** - * Throws any exception (including checked ones!) without defining it in the method signature. - */ - @SuppressWarnings("unchecked") - public static <E extends Throwable> void sneakyThrow(final Throwable throwable) throws E { - throw (E) throwable; - } - -} diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java index d4a270b..141eb95 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java @@ -16,7 +16,6 @@ */ package org.apache.logging.log4j.core.appender; -import org.apache.logging.log4j.core.AbstractLogEvent; import org.apache.logging.log4j.core.Appender; import org.apache.logging.log4j.core.Core; import org.apache.logging.log4j.core.Filter; @@ -37,7 +36,6 @@ import org.apache.logging.log4j.core.config.Property; import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; import org.apache.logging.log4j.core.filter.AbstractFilterable; import org.apache.logging.log4j.core.impl.Log4jLogEvent; -import org.apache.logging.log4j.core.util.Log4jThread; import org.apache.logging.log4j.plugins.Plugin; import org.apache.logging.log4j.plugins.PluginAliases; import org.apache.logging.log4j.plugins.PluginBuilderAttribute; @@ -52,7 +50,6 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TransferQueue; -import java.util.concurrent.atomic.AtomicLong; /** * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an @@ -63,11 +60,6 @@ import java.util.concurrent.atomic.AtomicLong; public final class AsyncAppender extends AbstractAppender { private static final int DEFAULT_QUEUE_SIZE = 1024; - private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() { - // empty - }; - - private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1); private final BlockingQueue<LogEvent> queue; private final int queueSize; @@ -78,13 +70,13 @@ public final class AsyncAppender extends AbstractAppender { private final String errorRef; private final boolean includeLocation; private AppenderControl errorAppender; - private AsyncThread thread; + private AsyncAppenderEventDispatcher dispatcher; private AsyncQueueFullPolicy asyncQueueFullPolicy; private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs, final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions, final long shutdownTimeout, final Configuration config, final boolean includeLocation, - final BlockingQueueFactory<LogEvent> blockingQueueFactory, Property[] properties) { + final BlockingQueueFactory<LogEvent> blockingQueueFactory, final Property[] properties) { super(name, filter, null, ignoreExceptions, properties); this.queue = blockingQueueFactory.create(queueSize); this.queueSize = queueSize; @@ -117,14 +109,14 @@ public final class AsyncAppender extends AbstractAppender { } } if (appenders.size() > 0) { - thread = new AsyncThread(appenders, queue); - thread.setName("AsyncAppender-" + getName()); + dispatcher = new AsyncAppenderEventDispatcher( + getName(), errorAppender, appenders, queue); } else if (errorRef == null) { throw new ConfigurationException("No appenders are available for AsyncAppender " + getName()); } asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create(); - thread.start(); + dispatcher.start(); super.start(); } @@ -133,10 +125,11 @@ public final class AsyncAppender extends AbstractAppender { setStopping(); super.stop(timeout, timeUnit, false); LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size()); - thread.shutdown(); try { - thread.join(shutdownTimeout); - } catch (final InterruptedException ex) { + dispatcher.stop(shutdownTimeout); + } catch (final InterruptedException ignored) { + // Restore the interrupted flag cleared when the exception is caught. + Thread.currentThread().interrupt(); LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName()); } LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size()); @@ -169,7 +162,7 @@ public final class AsyncAppender extends AbstractAppender { logMessageInCurrentThread(logEvent); } else { // delegate to the event router (which may discard, enqueue and block, or log in current thread) - final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel()); + final EventRoute route = asyncQueueFullPolicy.getRoute(dispatcher.getId(), memento.getLevel()); route.logMessage(this, memento); } } else { @@ -192,8 +185,7 @@ public final class AsyncAppender extends AbstractAppender { */ public void logMessageInCurrentThread(final LogEvent logEvent) { logEvent.setEndOfBatch(queue.isEmpty()); - final boolean appendSuccessful = thread.callAppenders(logEvent); - logToErrorAppenderIfNecessary(appendSuccessful, logEvent); + dispatcher.dispatch(logEvent); } /** @@ -205,7 +197,7 @@ public final class AsyncAppender extends AbstractAppender { try { // wait for free slots in the queue queue.put(logEvent); - } catch (final InterruptedException e) { + } catch (final InterruptedException ignored) { final boolean appendSuccessful = handleInterruptedException(logEvent); logToErrorAppenderIfNecessary(appendSuccessful, logEvent); } @@ -245,7 +237,7 @@ public final class AsyncAppender extends AbstractAppender { } public static class Builder<B extends Builder<B>> extends AbstractFilterable.Builder<B> - implements org.apache.logging.log4j.plugins.util.Builder<AsyncAppender> { + implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> { @PluginElement("AppenderRef") @Required(message = "No appender references provided to AsyncAppender") @@ -338,103 +330,6 @@ public final class AsyncAppender extends AbstractAppender { } /** - * Thread that calls the Appenders. - */ - private class AsyncThread extends Log4jThread { - - private volatile boolean shutdown; - private final List<AppenderControl> appenders; - private final BlockingQueue<LogEvent> queue; - - public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) { - super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement()); - this.appenders = appenders; - this.queue = queue; - setDaemon(true); - } - - @Override - public void run() { - while (!shutdown) { - LogEvent event; - try { - event = queue.take(); - if (event == SHUTDOWN_LOG_EVENT) { - shutdown = true; - continue; - } - } catch (final InterruptedException ex) { - break; // LOG4J2-830 - } - event.setEndOfBatch(queue.isEmpty()); - final boolean success = callAppenders(event); - if (!success && errorAppender != null) { - try { - errorAppender.callAppender(event); - } catch (final Exception ex) { - // Silently accept the error. - } - } - } - // Process any remaining items in the queue. - LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.", - queue.size()); - int count = 0; - int ignored = 0; - while (!queue.isEmpty()) { - try { - final LogEvent event = queue.take(); - if (event instanceof Log4jLogEvent) { - final Log4jLogEvent logEvent = (Log4jLogEvent) event; - logEvent.setEndOfBatch(queue.isEmpty()); - callAppenders(logEvent); - count++; - } else { - ignored++; - LOGGER.trace("Ignoring event of class {}", event.getClass().getName()); - } - } catch (final InterruptedException ex) { - // May have been interrupted to shut down. - // Here we ignore interrupts and try to process all remaining events. - } - } - LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " - + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored); - } - - /** - * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl} - * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any - * exceptions are silently ignored. - * - * @param event the event to forward to the registered appenders - * @return {@code true} if at least one appender call succeeded, {@code false} otherwise - */ - boolean callAppenders(final LogEvent event) { - boolean success = false; - for (final AppenderControl control : appenders) { - try { - control.callAppender(event); - success = true; - } catch (final Exception ex) { - // If no appender is successful the error appender will get it. - } - } - return success; - } - - public void shutdown() { - shutdown = true; - if (queue.isEmpty()) { - queue.offer(SHUTDOWN_LOG_EVENT); - } - if (getState() == State.TIMED_WAITING || getState() == State.WAITING) { - this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call - } - } - } - - /** * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings. * * @return the names of the sink appenders @@ -486,11 +381,12 @@ public final class AsyncAppender extends AbstractAppender { /** * Returns the number of elements in the queue. - * - * @return the number of elements in the queue. + * + * @return the number of elements in the queue. * @since 2.11.1 */ public int getQueueSize() { return queue.size(); } + } diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java new file mode 100644 index 0000000..a01f68d --- /dev/null +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.core.appender; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.config.AppenderControl; +import org.apache.logging.log4j.core.impl.Log4jLogEvent; +import org.apache.logging.log4j.core.util.Log4jThread; +import org.apache.logging.log4j.status.StatusLogger; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +class AsyncAppenderEventDispatcher extends Log4jThread { + + private static final LogEvent STOP_EVENT = new Log4jLogEvent(); + + private static final AtomicLong THREAD_COUNTER = new AtomicLong(0); + + private static final Logger LOGGER = StatusLogger.getLogger(); + + private final AppenderControl errorAppender; + + private final List<AppenderControl> appenders; + + private final BlockingQueue<LogEvent> queue; + + private final AtomicBoolean stoppedRef; + + AsyncAppenderEventDispatcher( + final String name, + final AppenderControl errorAppender, + final List<AppenderControl> appenders, + final BlockingQueue<LogEvent> queue) { + super("AsyncAppenderEventDispatcher-" + THREAD_COUNTER.incrementAndGet() + "-" + name); + this.errorAppender = errorAppender; + this.appenders = appenders; + this.queue = queue; + this.stoppedRef = new AtomicBoolean(false); + } + + @Override + public void run() { + LOGGER.trace("{} has started.", getName()); + dispatchAll(); + dispatchRemaining(); + } + + private void dispatchAll() { + while (!stoppedRef.get()) { + LogEvent event; + try { + event = queue.take(); + } catch (final InterruptedException ignored) { + // Restore the interrupted flag cleared when the exception is caught. + interrupt(); + break; + } + if (event == STOP_EVENT) { + break; + } + event.setEndOfBatch(queue.isEmpty()); + dispatch(event); + } + LOGGER.trace("{} has stopped.", getName()); + } + + private void dispatchRemaining() { + int eventCount = 0; + while (true) { + // Note the non-blocking Queue#poll() method! + final LogEvent event = queue.poll(); + if (event == null) { + break; + } + // Allow events that managed to be submitted after the sentinel. + if (event == STOP_EVENT) { + continue; + } + event.setEndOfBatch(queue.isEmpty()); + dispatch(event); + eventCount++; + } + LOGGER.trace( + "{} has processed the last {} remaining event(s).", + getName(), eventCount); + } + + /** + * Dispatches the given {@code event} to the registered appenders <b>in the + * current thread</b>. + */ + void dispatch(final LogEvent event) { + + // Dispatch the event to all registered appenders. + boolean succeeded = false; + // noinspection ForLoopReplaceableByForEach (avoid iterator instantion) + for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) { + final AppenderControl control = appenders.get(appenderIndex); + try { + control.callAppender(event); + succeeded = true; + } catch (final Throwable error) { + // If no appender is successful, the error appender will get it. + // It is okay to simply log it here. + LOGGER.trace( + "{} has failed to call appender {}", + getName(), control.getAppenderName(), error); + } + } + + // Fallback to the error appender if none has succeeded so far. + if (!succeeded && errorAppender != null) { + try { + errorAppender.callAppender(event); + } catch (final Throwable error) { + // If the error appender also fails, there is nothing further + // we can do about it. + LOGGER.trace( + "{} has failed to call the error appender {}", + getName(), errorAppender.getAppenderName(), error); + } + } + + } + + void stop(final long timeoutMillis) throws InterruptedException { + + // Mark the completion, if necessary. + final boolean stopped = stoppedRef.compareAndSet(false, true); + if (stopped) { + LOGGER.trace("{} is signaled to stop.", getName()); + } + + // There is a slight chance that the thread is not started yet, wait for + // it to run. Otherwise, interrupt+join might block. + // noinspection StatementWithEmptyBody + while (Thread.State.NEW.equals(getState())); + + // Enqueue the stop event, if there is sufficient room; otherwise, + // fallback to interruption. (We should avoid interrupting the thread if + // at all possible due to the subtleties of Java interruption, which + // will actually close sockets if any blocking operations are in + // progress! This means a socket appender may surprisingly fail to + // deliver final events. I recall some oddities with file I/O as well. + // — ckozak) + final boolean added = queue.offer(STOP_EVENT); + if (!added) { + interrupt(); + } + + // Wait for the completion. + join(timeoutMillis); + + } + +} diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/config/AppenderControl.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/config/AppenderControl.java index 1e71afa..a4d00d3 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/config/AppenderControl.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/config/AppenderControl.java @@ -156,7 +156,7 @@ public class AppenderControl extends AbstractFilterable { appender.append(event); } catch (final RuntimeException error) { handleAppenderError(event, error); - } catch (final Throwable error) { + } catch (final Exception error) { handleAppenderError(event, new AppenderLoggingException(error)); } } diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/AsyncAppenderExceptionHandlingTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/AsyncAppenderExceptionHandlingTest.java index 8318c81..164bdca 100644 --- a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/AsyncAppenderExceptionHandlingTest.java +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/AsyncAppenderExceptionHandlingTest.java @@ -39,10 +39,10 @@ import java.util.stream.Collectors; * Verifies {@link AsyncAppender} works after certain type of {@link Appender} * failures. * <p> - * {@link AsyncAppender} thread is known to get killed due to + * {@code AsyncAppender} thread is known to get killed due to * {@link AppenderControl} leaking exceptions in the past. This class is more - * of an end-to-end test to verify that {@link AppenderControl} catches all kind - * of {@link Throwable}s. + * of an end-to-end test to verify that {@code AsyncAppender} still works even + * if the background thread gets killed. */ class AsyncAppenderExceptionHandlingTest { @@ -52,7 +52,8 @@ class AsyncAppenderExceptionHandlingTest { FailOnceAppender.ThrowableClassName.LOGGING_EXCEPTION, FailOnceAppender.ThrowableClassName.EXCEPTION, FailOnceAppender.ThrowableClassName.ERROR, - FailOnceAppender.ThrowableClassName.THROWABLE + FailOnceAppender.ThrowableClassName.THROWABLE, + FailOnceAppender.ThrowableClassName.THREAD_DEATH }) void AsyncAppender_should_not_stop_on_appender_failures(String throwableClassName) { diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/test/appender/FailOnceAppender.java b/log4j-core/src/test/java/org/apache/logging/log4j/test/appender/FailOnceAppender.java index 7fe0296..6a8f5f2 100644 --- a/log4j-core/src/test/java/org/apache/logging/log4j/test/appender/FailOnceAppender.java +++ b/log4j-core/src/test/java/org/apache/logging/log4j/test/appender/FailOnceAppender.java @@ -23,9 +23,9 @@ import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.config.Property; import org.apache.logging.log4j.core.config.plugins.PluginAttribute; import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.util.Throwables; import org.apache.logging.log4j.plugins.Plugin; import org.apache.logging.log4j.plugins.validation.constraints.Required; -import org.apache.logging.log4j.util.SneakyThrow; import java.util.ArrayList; import java.util.List; @@ -53,7 +53,7 @@ public class FailOnceAppender extends AbstractAppender { if (!failed) { failed = true; Throwable throwable = throwableSupplier.get(); - SneakyThrow.sneakyThrow(throwable); + Throwables.rethrow(throwable); } events.add(event); } @@ -95,11 +95,20 @@ public class FailOnceAppender extends AbstractAppender { case ThrowableClassName.EXCEPTION: return () -> new Exception(message); case ThrowableClassName.ERROR: return () -> new Error(message); case ThrowableClassName.THROWABLE: return () -> new Throwable(message); + case ThrowableClassName.THREAD_DEATH: return () -> { + stopCurrentThread(); + throw new IllegalStateException("should not have reached here"); + }; default: throw new IllegalArgumentException("unknown throwable class name: " + throwableClassName); } } + @SuppressWarnings("deprecation") + private static void stopCurrentThread() { + Thread.currentThread().stop(); + } + public enum ThrowableClassName {; public static final String RUNTIME_EXCEPTION = "RuntimeException"; @@ -112,6 +121,8 @@ public class FailOnceAppender extends AbstractAppender { public static final String THROWABLE = "Throwable"; + public static final String THREAD_DEATH = "ThreadDeath"; + } }
