Repository: kafka Updated Branches: refs/heads/trunk 454f0f1ca -> bbc390018
MINOR: Use static imports in KafkaLog4jAppender Instead of redefining the constants. Author: Kamal C <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3349 from Kamal15/log4j2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bbc39001 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bbc39001 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bbc39001 Branch: refs/heads/trunk Commit: bbc390018571602fb4fb77609093d450ab95a4c0 Parents: 454f0f1 Author: Kamal C <[email protected]> Authored: Tue Jul 18 16:50:24 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Tue Jul 18 16:50:29 2017 +0100 ---------------------------------------------------------------------- .../kafka/log4jappender/KafkaLog4jAppender.java | 85 ++++++++++---------- .../log4jappender/MockKafkaLog4jAppender.java | 5 +- 2 files changed, 44 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bbc39001/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java ---------------------------------------------------------------------- diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java index 513052e..6a09cab 100644 --- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java @@ -16,15 +16,12 @@ */ package org.apache.kafka.log4jappender; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.helpers.LogLog; import org.apache.log4j.spi.LoggingEvent; @@ -35,43 +32,43 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_KERBEROS_SERVICE_NAME; + /** * A log4j appender that produces log messages to Kafka */ public class KafkaLog4jAppender extends AppenderSkeleton { - private static final String BOOTSTRAP_SERVERS_CONFIG = ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; - private static final String COMPRESSION_TYPE_CONFIG = ProducerConfig.COMPRESSION_TYPE_CONFIG; - private static final String ACKS_CONFIG = ProducerConfig.ACKS_CONFIG; - private static final String RETRIES_CONFIG = ProducerConfig.RETRIES_CONFIG; - private static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; - private static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; - private static final String SECURITY_PROTOCOL = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; - private static final String SSL_TRUSTSTORE_LOCATION = SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; - private static final String SSL_TRUSTSTORE_PASSWORD = SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; - private static final String SSL_KEYSTORE_TYPE = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG; - private static final String SSL_KEYSTORE_LOCATION = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; - private static final String SSL_KEYSTORE_PASSWORD = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; - private static final String SASL_KERBEROS_SERVICE_NAME = SaslConfigs.SASL_KERBEROS_SERVICE_NAME; - - private String brokerList = null; - private String topic = null; - private String compressionType = null; - private String securityProtocol = null; - private String sslTruststoreLocation = null; - private String sslTruststorePassword = null; - private String sslKeystoreType = null; - private String sslKeystoreLocation = null; - private String sslKeystorePassword = null; - private String saslKerberosServiceName = null; - private String clientJaasConfPath = null; - private String kerb5ConfPath = null; - - private int retries = 0; + private String brokerList; + private String topic; + private String compressionType; + private String securityProtocol; + private String sslTruststoreLocation; + private String sslTruststorePassword; + private String sslKeystoreType; + private String sslKeystoreLocation; + private String sslKeystorePassword; + private String saslKerberosServiceName; + private String clientJaasConfPath; + private String kerb5ConfPath; + + private int retries; private int requiredNumAcks = Integer.MAX_VALUE; - private boolean syncSend = false; - private Producer<byte[], byte[]> producer = null; - + private boolean syncSend; + private Producer<byte[], byte[]> producer; + public Producer<byte[], byte[]> getProducer() { return producer; } @@ -213,18 +210,18 @@ public class KafkaLog4jAppender extends AppenderSkeleton { if (retries > 0) props.put(RETRIES_CONFIG, retries); if (securityProtocol != null) { - props.put(SECURITY_PROTOCOL, securityProtocol); + props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol); } if (securityProtocol != null && securityProtocol.contains("SSL") && sslTruststoreLocation != null && sslTruststorePassword != null) { - props.put(SSL_TRUSTSTORE_LOCATION, sslTruststoreLocation); - props.put(SSL_TRUSTSTORE_PASSWORD, sslTruststorePassword); + props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation); + props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword); if (sslKeystoreType != null && sslKeystoreLocation != null && sslKeystorePassword != null) { - props.put(SSL_KEYSTORE_TYPE, sslKeystoreType); - props.put(SSL_KEYSTORE_LOCATION, sslKeystoreLocation); - props.put(SSL_KEYSTORE_PASSWORD, sslKeystorePassword); + props.put(SSL_KEYSTORE_TYPE_CONFIG, sslKeystoreType); + props.put(SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation); + props.put(SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword); } } if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) { @@ -235,15 +232,15 @@ public class KafkaLog4jAppender extends AppenderSkeleton { } } - props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put(KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + props.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.producer = getKafkaProducer(props); LogLog.debug("Kafka producer connected to " + brokerList); LogLog.debug("Logging for topic: " + topic); } protected Producer<byte[], byte[]> getKafkaProducer(Properties props) { - return new KafkaProducer<byte[], byte[]>(props); + return new KafkaProducer<>(props); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/bbc39001/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java ---------------------------------------------------------------------- diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java index 87aaca8..8040be4 100644 --- a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java +++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java @@ -22,11 +22,12 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.test.MockSerializer; import org.apache.log4j.spi.LoggingEvent; +import java.util.List; import java.util.Properties; public class MockKafkaLog4jAppender extends KafkaLog4jAppender { private MockProducer<byte[], byte[]> mockProducer = - new MockProducer<byte[], byte[]>(false, new MockSerializer(), new MockSerializer()); + new MockProducer<>(false, new MockSerializer(), new MockSerializer()); @Override protected Producer<byte[], byte[]> getKafkaProducer(Properties props) { @@ -41,7 +42,7 @@ public class MockKafkaLog4jAppender extends KafkaLog4jAppender { super.append(event); } - protected java.util.List<ProducerRecord<byte[], byte[]>> getHistory() { + protected List<ProducerRecord<byte[], byte[]>> getHistory() { return mockProducer.history(); } }
