This is an automated email from the ASF dual-hosted git repository. vy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git
commit 97db5743a3b10e9017bf70794d6275b21553dd44 Author: Volkan Yazıcı <volkan.yaz...@gmail.com> AuthorDate: Thu Nov 12 17:46:20 2020 +0100 LOG4J2-2916 Avoid redundant Kafka producer instantiation causing thread leaks. --- .../logging/log4j/kafka/appender/KafkaManager.java | 4 +- .../KafkaManagerProducerThreadLeakTest.java | 65 ++++++++++++++++++++++ src/changes/changes.xml | 3 + 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java index a4e4ddb..db03167 100644 --- a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java +++ b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java @@ -153,7 +153,9 @@ public class KafkaManager extends AbstractManager { } public void startup() { - producer = producerFactory.newKafkaProducer(config); + if (producer == null) { + producer = producerFactory.newKafkaProducer(config); + } } public String getTopic() { diff --git a/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaManagerProducerThreadLeakTest.java b/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaManagerProducerThreadLeakTest.java new file mode 100644 index 0000000..73fcf62 --- /dev/null +++ b/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaManagerProducerThreadLeakTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.kafka.appender; + +import org.apache.logging.log4j.categories.Appenders; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.junit.LoggerContextSource; +import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Verifies that restarting the {@link LoggerContext} doesn't cause + * {@link KafkaManager} to leak threads. + * + * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-2916">LOG4J2-2916</a> + */ +@Category(Appenders.Kafka.class) +@LoggerContextSource("KafkaAppenderTest.xml") +class KafkaManagerProducerThreadLeakTest { + + @Test + void context_restart_shouldnt_leak_producer_threads(final LoggerContext context) { + + // Determine the initial number of threads. + final int initialThreadCount = kafkaProducerThreadCount(); + + // Perform context restarts. + final int contextRestartCount = 3; + for (int i = 0; i < contextRestartCount; i++) { + context.reconfigure(); + } + + // Verify the final thread count. + final int lastThreadCount = kafkaProducerThreadCount(); + assertEquals(initialThreadCount, lastThreadCount); + + } + + private static int kafkaProducerThreadCount() { + final long threadCount = Thread + .getAllStackTraces() + .keySet() + .stream() + .filter(thread -> thread.getName().startsWith("kafka-producer")) + .count(); + return Math.toIntExact(threadCount); + } + +} diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 313a922..e191147 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -31,6 +31,9 @@ - "remove" - Removed --> <release version="3.0.0" date="2019-xx-xx" description="GA Release 3.0.0"> + <action issue="LOG4J2-2916" dev="vy" type="fix" due-to="wuqian0808"> + Avoid redundant Kafka producer instantiation causing thread leaks. + </action> <action dev="vy" type="update"> Update jetty-util from 8.2.0.v20160908 to 9.4.31.v20200723. </action>