Use Log4jThread to name the thread. Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/f1b61bdd Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/f1b61bdd Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/f1b61bdd
Branch: refs/heads/LOG4J2-1528 Commit: f1b61bddf640ecc5d2c80ea190ff7332577cb787 Parents: 1aa3c3e Author: Gary Gregory <ggreg...@apache.org> Authored: Sat Aug 20 17:30:16 2016 -0700 Committer: Gary Gregory <ggreg...@apache.org> Committed: Sat Aug 20 17:30:16 2016 -0700 ---------------------------------------------------------------------- .../core/appender/mom/kafka/KafkaManager.java | 185 +++++++++---------- 1 file changed, 92 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/f1b61bdd/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java index 4e4a09c..d535e02 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java @@ -1,93 +1,92 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache license, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the license for the specific language governing permissions and - * limitations under the license. - */ - -package org.apache.logging.log4j.core.appender.mom.kafka; - -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.logging.log4j.core.appender.AbstractManager; -import org.apache.logging.log4j.core.config.Property; -import org.apache.logging.log4j.core.util.Log4jThread; - -public class KafkaManager extends AbstractManager { - - public static final String DEFAULT_TIMEOUT_MILLIS = "30000"; - - /** - * package-private access for testing. - */ - static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(); - - private final Properties config = new Properties(); - private Producer<byte[], byte[]> producer = null; - private final int timeoutMillis; - - private final String topic; - - public KafkaManager(final String name, final String topic, final Property[] properties) { - super(name); - this.topic = topic; - config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - config.setProperty("batch.size", "0"); - for (final Property property : properties) { - config.setProperty(property.getName(), property.getValue()); - } - this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS)); - } - - @Override - public void releaseSub() { - if (producer != null) { - // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660 - final Thread closeThread = new Log4jThread(new Runnable() { - @Override - public void run() { - producer.close(); - } - }); - closeThread.setName("KafkaManager-CloseThread"); - closeThread.setDaemon(true); // avoid blocking JVM shutdown - closeThread.start(); - try { - closeThread.join(timeoutMillis); - } catch (final InterruptedException ignore) { - // ignore - } - } - } - - public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException { - if (producer != null) { - producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS); - } - } - - public void startup() { - producer = producerFactory.newKafkaProducer(config); - } - - public String getTopic() { - return topic; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.apache.logging.log4j.core.appender.mom.kafka; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.logging.log4j.core.appender.AbstractManager; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.util.Log4jThread; + +public class KafkaManager extends AbstractManager { + + public static final String DEFAULT_TIMEOUT_MILLIS = "30000"; + + /** + * package-private access for testing. + */ + static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(); + + private final Properties config = new Properties(); + private Producer<byte[], byte[]> producer = null; + private final int timeoutMillis; + + private final String topic; + + public KafkaManager(final String name, final String topic, final Property[] properties) { + super(name); + this.topic = topic; + config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + config.setProperty("batch.size", "0"); + for (final Property property : properties) { + config.setProperty(property.getName(), property.getValue()); + } + this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS)); + } + + @Override + public void releaseSub() { + if (producer != null) { + // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660 + final Thread closeThread = new Log4jThread(new Runnable() { + @Override + public void run() { + producer.close(); + } + }, "KafkaManager-CloseThread"); + closeThread.setDaemon(true); // avoid blocking JVM shutdown + closeThread.start(); + try { + closeThread.join(timeoutMillis); + } catch (final InterruptedException ignore) { + // ignore + } + } + } + + public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException { + if (producer != null) { + producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS); + } + } + + public void startup() { + producer = producerFactory.newKafkaProducer(config); + } + + public String getTopic() { + return topic; + } + +}