This is an automated email from the ASF dual-hosted git repository.

vy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git

commit 6bf68b73b322b9b52e2ff7d969133fb816f57d38
Author: Volkan Yazıcı <[email protected]>
AuthorDate: Sat Jan 9 15:58:29 2021 +0100

    LOG4J2-2972 Refactor AsyncAppender and AppenderControl for handling of 
Throwables. (#452)
---
 .../org/apache/logging/log4j/util/SneakyThrow.java |  13 --
 .../logging/log4j/core/appender/AsyncAppender.java | 136 ++--------------
 .../appender/AsyncAppenderEventDispatcher.java     | 174 +++++++++++++++++++++
 .../logging/log4j/core/config/AppenderControl.java |   2 +-
 .../async/AsyncAppenderExceptionHandlingTest.java  |   9 +-
 .../log4j/test/appender/FailOnceAppender.java      |  15 +-
 6 files changed, 209 insertions(+), 140 deletions(-)

diff --git 
a/log4j-api/src/test/java/org/apache/logging/log4j/util/SneakyThrow.java 
b/log4j-api/src/test/java/org/apache/logging/log4j/util/SneakyThrow.java
deleted file mode 100644
index af26e6e..0000000
--- a/log4j-api/src/test/java/org/apache/logging/log4j/util/SneakyThrow.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.logging.log4j.util;
-
-public enum SneakyThrow {;
-
-    /**
-     * Throws any exception (including checked ones!) without defining it in 
the method signature.
-     */
-    @SuppressWarnings("unchecked")
-    public static <E extends Throwable> void sneakyThrow(final Throwable 
throwable) throws E {
-        throw (E) throwable;
-    }
-
-}
diff --git 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java
 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java
index d4a270b..141eb95 100644
--- 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java
+++ 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppender.java
@@ -16,7 +16,6 @@
  */
 package org.apache.logging.log4j.core.appender;
 
-import org.apache.logging.log4j.core.AbstractLogEvent;
 import org.apache.logging.log4j.core.Appender;
 import org.apache.logging.log4j.core.Core;
 import org.apache.logging.log4j.core.Filter;
@@ -37,7 +36,6 @@ import org.apache.logging.log4j.core.config.Property;
 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
 import org.apache.logging.log4j.core.filter.AbstractFilterable;
 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
-import org.apache.logging.log4j.core.util.Log4jThread;
 import org.apache.logging.log4j.plugins.Plugin;
 import org.apache.logging.log4j.plugins.PluginAliases;
 import org.apache.logging.log4j.plugins.PluginBuilderAttribute;
@@ -52,7 +50,6 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TransferQueue;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Appends to one or more Appenders asynchronously. You can configure an 
AsyncAppender with one or more Appenders and an
@@ -63,11 +60,6 @@ import java.util.concurrent.atomic.AtomicLong;
 public final class AsyncAppender extends AbstractAppender {
 
     private static final int DEFAULT_QUEUE_SIZE = 1024;
-    private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() {
-        // empty
-    };
-
-    private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
 
     private final BlockingQueue<LogEvent> queue;
     private final int queueSize;
@@ -78,13 +70,13 @@ public final class AsyncAppender extends AbstractAppender {
     private final String errorRef;
     private final boolean includeLocation;
     private AppenderControl errorAppender;
-    private AsyncThread thread;
+    private AsyncAppenderEventDispatcher dispatcher;
     private AsyncQueueFullPolicy asyncQueueFullPolicy;
 
     private AsyncAppender(final String name, final Filter filter, final 
AppenderRef[] appenderRefs,
             final String errorRef, final int queueSize, final boolean 
blocking, final boolean ignoreExceptions,
             final long shutdownTimeout, final Configuration config, final 
boolean includeLocation,
-            final BlockingQueueFactory<LogEvent> blockingQueueFactory, 
Property[] properties) {
+            final BlockingQueueFactory<LogEvent> blockingQueueFactory, final 
Property[] properties) {
         super(name, filter, null, ignoreExceptions, properties);
         this.queue = blockingQueueFactory.create(queueSize);
         this.queueSize = queueSize;
@@ -117,14 +109,14 @@ public final class AsyncAppender extends AbstractAppender 
{
             }
         }
         if (appenders.size() > 0) {
-            thread = new AsyncThread(appenders, queue);
-            thread.setName("AsyncAppender-" + getName());
+            dispatcher = new AsyncAppenderEventDispatcher(
+                    getName(), errorAppender, appenders, queue);
         } else if (errorRef == null) {
             throw new ConfigurationException("No appenders are available for 
AsyncAppender " + getName());
         }
         asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
 
-        thread.start();
+        dispatcher.start();
         super.start();
     }
 
@@ -133,10 +125,11 @@ public final class AsyncAppender extends AbstractAppender 
{
         setStopping();
         super.stop(timeout, timeUnit, false);
         LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", 
queue.size());
-        thread.shutdown();
         try {
-            thread.join(shutdownTimeout);
-        } catch (final InterruptedException ex) {
+            dispatcher.stop(shutdownTimeout);
+        } catch (final InterruptedException ignored) {
+            // Restore the interrupted flag cleared when the exception is 
caught.
+            Thread.currentThread().interrupt();
             LOGGER.warn("Interrupted while stopping AsyncAppender {}", 
getName());
         }
         LOGGER.trace("AsyncAppender stopped. Queue has {} events.", 
queue.size());
@@ -169,7 +162,7 @@ public final class AsyncAppender extends AbstractAppender {
                     logMessageInCurrentThread(logEvent);
                 } else {
                     // delegate to the event router (which may discard, 
enqueue and block, or log in current thread)
-                    final EventRoute route = 
asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
+                    final EventRoute route = 
asyncQueueFullPolicy.getRoute(dispatcher.getId(), memento.getLevel());
                     route.logMessage(this, memento);
                 }
             } else {
@@ -192,8 +185,7 @@ public final class AsyncAppender extends AbstractAppender {
      */
     public void logMessageInCurrentThread(final LogEvent logEvent) {
         logEvent.setEndOfBatch(queue.isEmpty());
-        final boolean appendSuccessful = thread.callAppenders(logEvent);
-        logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
+        dispatcher.dispatch(logEvent);
     }
 
     /**
@@ -205,7 +197,7 @@ public final class AsyncAppender extends AbstractAppender {
         try {
             // wait for free slots in the queue
             queue.put(logEvent);
-        } catch (final InterruptedException e) {
+        } catch (final InterruptedException ignored) {
             final boolean appendSuccessful = 
handleInterruptedException(logEvent);
             logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
         }
@@ -245,7 +237,7 @@ public final class AsyncAppender extends AbstractAppender {
     }
 
     public static class Builder<B extends Builder<B>> extends 
AbstractFilterable.Builder<B>
-            implements 
org.apache.logging.log4j.plugins.util.Builder<AsyncAppender> {
+            implements 
org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
 
         @PluginElement("AppenderRef")
         @Required(message = "No appender references provided to AsyncAppender")
@@ -338,103 +330,6 @@ public final class AsyncAppender extends AbstractAppender 
{
     }
 
     /**
-     * Thread that calls the Appenders.
-     */
-    private class AsyncThread extends Log4jThread {
-
-        private volatile boolean shutdown;
-        private final List<AppenderControl> appenders;
-        private final BlockingQueue<LogEvent> queue;
-
-        public AsyncThread(final List<AppenderControl> appenders, final 
BlockingQueue<LogEvent> queue) {
-            super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
-            this.appenders = appenders;
-            this.queue = queue;
-            setDaemon(true);
-        }
-
-        @Override
-        public void run() {
-            while (!shutdown) {
-                LogEvent event;
-                try {
-                    event = queue.take();
-                    if (event == SHUTDOWN_LOG_EVENT) {
-                        shutdown = true;
-                        continue;
-                    }
-                } catch (final InterruptedException ex) {
-                    break; // LOG4J2-830
-                }
-                event.setEndOfBatch(queue.isEmpty());
-                final boolean success = callAppenders(event);
-                if (!success && errorAppender != null) {
-                    try {
-                        errorAppender.callAppender(event);
-                    } catch (final Exception ex) {
-                        // Silently accept the error.
-                    }
-                }
-            }
-            // Process any remaining items in the queue.
-            LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing 
remaining {} queue events.",
-                queue.size());
-            int count = 0;
-            int ignored = 0;
-            while (!queue.isEmpty()) {
-                try {
-                    final LogEvent event = queue.take();
-                    if (event instanceof Log4jLogEvent) {
-                        final Log4jLogEvent logEvent = (Log4jLogEvent) event;
-                        logEvent.setEndOfBatch(queue.isEmpty());
-                        callAppenders(logEvent);
-                        count++;
-                    } else {
-                        ignored++;
-                        LOGGER.trace("Ignoring event of class {}", 
event.getClass().getName());
-                    }
-                } catch (final InterruptedException ex) {
-                    // May have been interrupted to shut down.
-                    // Here we ignore interrupts and try to process all 
remaining events.
-                }
-            }
-            LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} 
events remaining. "
-                + "Processed {} and ignored {} events since shutdown 
started.", queue.size(), count, ignored);
-        }
-
-        /**
-         * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} 
on all registered {@code AppenderControl}
-         * objects, and returns {@code true} if at least one appender call was 
successful, {@code false} otherwise. Any
-         * exceptions are silently ignored.
-         *
-         * @param event the event to forward to the registered appenders
-         * @return {@code true} if at least one appender call succeeded, 
{@code false} otherwise
-         */
-        boolean callAppenders(final LogEvent event) {
-            boolean success = false;
-            for (final AppenderControl control : appenders) {
-                try {
-                    control.callAppender(event);
-                    success = true;
-                } catch (final Exception ex) {
-                    // If no appender is successful the error appender will 
get it.
-                }
-            }
-            return success;
-        }
-
-        public void shutdown() {
-            shutdown = true;
-            if (queue.isEmpty()) {
-                queue.offer(SHUTDOWN_LOG_EVENT);
-            }
-            if (getState() == State.TIMED_WAITING || getState() == 
State.WAITING) {
-                this.interrupt(); // LOG4J2-1422: if underlying appender is 
stuck in wait/sleep/join/park call
-            }
-        }
-    }
-
-    /**
      * Returns the names of the appenders that this asyncAppender delegates to 
as an array of Strings.
      *
      * @return the names of the sink appenders
@@ -486,11 +381,12 @@ public final class AsyncAppender extends AbstractAppender 
{
 
     /**
      * Returns the number of elements in the queue.
-     * 
-     * @return the number of elements in the queue. 
+     *
+     * @return the number of elements in the queue.
      * @since 2.11.1
      */
     public int getQueueSize() {
         return queue.size();
     }
+
 }
diff --git 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
new file mode 100644
index 0000000..a01f68d
--- /dev/null
+++ 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/AsyncAppenderEventDispatcher.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.config.AppenderControl;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.status.StatusLogger;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class AsyncAppenderEventDispatcher extends Log4jThread {
+
+    private static final LogEvent STOP_EVENT = new Log4jLogEvent();
+
+    private static final AtomicLong THREAD_COUNTER = new AtomicLong(0);
+
+    private static final Logger LOGGER = StatusLogger.getLogger();
+
+    private final AppenderControl errorAppender;
+
+    private final List<AppenderControl> appenders;
+
+    private final BlockingQueue<LogEvent> queue;
+
+    private final AtomicBoolean stoppedRef;
+
+    AsyncAppenderEventDispatcher(
+            final String name,
+            final AppenderControl errorAppender,
+            final List<AppenderControl> appenders,
+            final BlockingQueue<LogEvent> queue) {
+        super("AsyncAppenderEventDispatcher-" + 
THREAD_COUNTER.incrementAndGet() + "-" + name);
+        this.errorAppender = errorAppender;
+        this.appenders = appenders;
+        this.queue = queue;
+        this.stoppedRef = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        LOGGER.trace("{} has started.", getName());
+        dispatchAll();
+        dispatchRemaining();
+    }
+
+    private void dispatchAll() {
+        while (!stoppedRef.get()) {
+            LogEvent event;
+            try {
+                event = queue.take();
+            } catch (final InterruptedException ignored) {
+                // Restore the interrupted flag cleared when the exception is 
caught.
+                interrupt();
+                break;
+            }
+            if (event == STOP_EVENT) {
+                break;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+        }
+        LOGGER.trace("{} has stopped.", getName());
+    }
+
+    private void dispatchRemaining() {
+        int eventCount = 0;
+        while (true) {
+            // Note the non-blocking Queue#poll() method!
+            final LogEvent event = queue.poll();
+            if (event == null) {
+                break;
+            }
+            // Allow events that managed to be submitted after the sentinel.
+            if (event == STOP_EVENT) {
+                continue;
+            }
+            event.setEndOfBatch(queue.isEmpty());
+            dispatch(event);
+            eventCount++;
+        }
+        LOGGER.trace(
+                "{} has processed the last {} remaining event(s).",
+                getName(), eventCount);
+    }
+
+    /**
+     * Dispatches the given {@code event} to the registered appenders <b>in the
+     * current thread</b>.
+     */
+    void dispatch(final LogEvent event) {
+
+        // Dispatch the event to all registered appenders.
+        boolean succeeded = false;
+        // noinspection ForLoopReplaceableByForEach (avoid iterator instantion)
+        for (int appenderIndex = 0; appenderIndex < appenders.size(); 
appenderIndex++) {
+            final AppenderControl control = appenders.get(appenderIndex);
+            try {
+                control.callAppender(event);
+                succeeded = true;
+            } catch (final Throwable error) {
+                // If no appender is successful, the error appender will get 
it.
+                // It is okay to simply log it here.
+                LOGGER.trace(
+                        "{} has failed to call appender {}",
+                        getName(), control.getAppenderName(), error);
+            }
+        }
+
+        // Fallback to the error appender if none has succeeded so far.
+        if (!succeeded && errorAppender != null) {
+            try {
+                errorAppender.callAppender(event);
+            } catch (final Throwable error) {
+                // If the error appender also fails, there is nothing further
+                // we can do about it.
+                LOGGER.trace(
+                        "{} has failed to call the error appender {}",
+                        getName(), errorAppender.getAppenderName(), error);
+            }
+        }
+
+    }
+
+    void stop(final long timeoutMillis) throws InterruptedException {
+
+        // Mark the completion, if necessary.
+        final boolean stopped = stoppedRef.compareAndSet(false, true);
+        if (stopped) {
+            LOGGER.trace("{} is signaled to stop.", getName());
+        }
+
+        // There is a slight chance that the thread is not started yet, wait 
for
+        // it to run. Otherwise, interrupt+join might block.
+        // noinspection StatementWithEmptyBody
+        while (Thread.State.NEW.equals(getState()));
+
+        // Enqueue the stop event, if there is sufficient room; otherwise,
+        // fallback to interruption. (We should avoid interrupting the thread 
if
+        // at all possible due to the subtleties of Java interruption, which
+        // will actually close sockets if any blocking operations are in
+        // progress! This means a socket appender may surprisingly fail to
+        // deliver final events. I recall some oddities with file I/O as well.
+        // — ckozak)
+        final boolean added = queue.offer(STOP_EVENT);
+        if (!added) {
+            interrupt();
+        }
+
+        // Wait for the completion.
+        join(timeoutMillis);
+
+    }
+
+}
diff --git 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/config/AppenderControl.java
 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/config/AppenderControl.java
index 1e71afa..a4d00d3 100644
--- 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/config/AppenderControl.java
+++ 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/config/AppenderControl.java
@@ -156,7 +156,7 @@ public class AppenderControl extends AbstractFilterable {
             appender.append(event);
         } catch (final RuntimeException error) {
             handleAppenderError(event, error);
-        } catch (final Throwable error) {
+        } catch (final Exception error) {
             handleAppenderError(event, new AppenderLoggingException(error));
         }
     }
diff --git 
a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/AsyncAppenderExceptionHandlingTest.java
 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/AsyncAppenderExceptionHandlingTest.java
index 8318c81..164bdca 100644
--- 
a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/AsyncAppenderExceptionHandlingTest.java
+++ 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/AsyncAppenderExceptionHandlingTest.java
@@ -39,10 +39,10 @@ import java.util.stream.Collectors;
  * Verifies {@link AsyncAppender} works after certain type of {@link Appender}
  * failures.
  * <p>
- * {@link AsyncAppender} thread is known to get killed due to
+ * {@code AsyncAppender} thread is known to get killed due to
  * {@link AppenderControl} leaking exceptions in the past. This class is more
- * of an end-to-end test to verify that {@link AppenderControl} catches all 
kind
- * of {@link Throwable}s.
+ * of an end-to-end test to verify that {@code AsyncAppender} still works even
+ * if the background thread gets killed.
  */
 class AsyncAppenderExceptionHandlingTest {
 
@@ -52,7 +52,8 @@ class AsyncAppenderExceptionHandlingTest {
             FailOnceAppender.ThrowableClassName.LOGGING_EXCEPTION,
             FailOnceAppender.ThrowableClassName.EXCEPTION,
             FailOnceAppender.ThrowableClassName.ERROR,
-            FailOnceAppender.ThrowableClassName.THROWABLE
+            FailOnceAppender.ThrowableClassName.THROWABLE,
+            FailOnceAppender.ThrowableClassName.THREAD_DEATH
     })
     void AsyncAppender_should_not_stop_on_appender_failures(String 
throwableClassName) {
 
diff --git 
a/log4j-core/src/test/java/org/apache/logging/log4j/test/appender/FailOnceAppender.java
 
b/log4j-core/src/test/java/org/apache/logging/log4j/test/appender/FailOnceAppender.java
index 7fe0296..6a8f5f2 100644
--- 
a/log4j-core/src/test/java/org/apache/logging/log4j/test/appender/FailOnceAppender.java
+++ 
b/log4j-core/src/test/java/org/apache/logging/log4j/test/appender/FailOnceAppender.java
@@ -23,9 +23,9 @@ import 
org.apache.logging.log4j.core.appender.AbstractAppender;
 import org.apache.logging.log4j.core.config.Property;
 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.util.Throwables;
 import org.apache.logging.log4j.plugins.Plugin;
 import org.apache.logging.log4j.plugins.validation.constraints.Required;
-import org.apache.logging.log4j.util.SneakyThrow;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -53,7 +53,7 @@ public class FailOnceAppender extends AbstractAppender {
         if (!failed) {
             failed = true;
             Throwable throwable = throwableSupplier.get();
-            SneakyThrow.sneakyThrow(throwable);
+            Throwables.rethrow(throwable);
         }
         events.add(event);
     }
@@ -95,11 +95,20 @@ public class FailOnceAppender extends AbstractAppender {
             case ThrowableClassName.EXCEPTION: return () -> new 
Exception(message);
             case ThrowableClassName.ERROR: return () -> new Error(message);
             case ThrowableClassName.THROWABLE: return () -> new 
Throwable(message);
+            case ThrowableClassName.THREAD_DEATH: return () -> {
+                stopCurrentThread();
+                throw new IllegalStateException("should not have reached 
here");
+            };
             default: throw new IllegalArgumentException("unknown throwable 
class name: " + throwableClassName);
         }
 
     }
 
+    @SuppressWarnings("deprecation")
+    private static void stopCurrentThread() {
+        Thread.currentThread().stop();
+    }
+
     public enum ThrowableClassName {;
 
         public static final String RUNTIME_EXCEPTION = "RuntimeException";
@@ -112,6 +121,8 @@ public class FailOnceAppender extends AbstractAppender {
 
         public static final String THROWABLE = "Throwable";
 
+        public static final String THREAD_DEATH = "ThreadDeath";
+
     }
 
 }

Reply via email to