This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c580f08 KAFKA-7134: KafkaLog4jAppender exception handling with
ignoreExceptions (#5415)
c580f08 is described below
commit c580f08b7775e4431a4a1559ef7239b69708a782
Author: Andras Katona <[email protected]>
AuthorDate: Wed Aug 29 13:30:39 2018 +0200
KAFKA-7134: KafkaLog4jAppender exception handling with ignoreExceptions
(#5415)
Reviewers: Andras Beni <[email protected]>, Sandor Murakozi
<[email protected]>, Rajini Sivaram <[email protected]>
---
build.gradle | 1 +
.../kafka/log4jappender/KafkaLog4jAppender.java | 28 +++++-
.../log4jappender/KafkaLog4jAppenderTest.java | 112 +++++++++++++++++++--
.../log4jappender/MockKafkaLog4jAppender.java | 6 +-
4 files changed, 135 insertions(+), 12 deletions(-)
diff --git a/build.gradle b/build.gradle
index 9b4610d..b13e948 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1235,6 +1235,7 @@ project(':log4j-appender') {
testCompile project(':clients').sourceSets.test.output
testCompile libs.junit
+ testCompile libs.easymock
}
javadoc {
diff --git
a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
index 6ddaf92..0ae7dca 100644
---
a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
+++
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Future;
import static
org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
@@ -64,10 +65,12 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
private String saslKerberosServiceName;
private String clientJaasConfPath;
private String kerb5ConfPath;
+ private Integer maxBlockMs;
private int retries = Integer.MAX_VALUE;
private int requiredNumAcks = 1;
private int deliveryTimeoutMs = 120000;
+ private boolean ignoreExceptions = true;
private boolean syncSend;
private Producer<byte[], byte[]> producer;
@@ -123,6 +126,14 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
this.topic = topic;
}
+ public boolean getIgnoreExceptions() {
+ return ignoreExceptions;
+ }
+
+ public void setIgnoreExceptions(boolean ignoreExceptions) {
+ this.ignoreExceptions = ignoreExceptions;
+ }
+
public boolean getSyncSend() {
return syncSend;
}
@@ -203,6 +214,14 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
return kerb5ConfPath;
}
+ public int getMaxBlockMs() {
+ return maxBlockMs;
+ }
+
+ public void setMaxBlockMs(int maxBlockMs) {
+ this.maxBlockMs = maxBlockMs;
+ }
+
@Override
public void activateOptions() {
// check for config parameter validity
@@ -242,6 +261,9 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
System.setProperty("java.security.krb5.conf", kerb5ConfPath);
}
}
+ if (maxBlockMs != null) {
+ props.put(MAX_BLOCK_MS_CONFIG, maxBlockMs);
+ }
props.put(KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
@@ -259,12 +281,14 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
String message = subAppend(event);
LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
Future<RecordMetadata> response = producer.send(
- new ProducerRecord<byte[], byte[]>(topic,
message.getBytes(StandardCharsets.UTF_8)));
+ new ProducerRecord<>(topic,
message.getBytes(StandardCharsets.UTF_8)));
if (syncSend) {
try {
response.get();
} catch (InterruptedException | ExecutionException ex) {
- throw new RuntimeException(ex);
+ if (!ignoreExceptions)
+ throw new RuntimeException(ex);
+ LogLog.debug("Exception while getting response", ex);
}
}
}
diff --git
a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
index 34be2e9..d5342e8 100644
---
a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
+++
b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
@@ -16,18 +16,31 @@
*/
package org.apache.kafka.log4jappender;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.helpers.LogLog;
+import org.easymock.EasyMock;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
public class KafkaLog4jAppenderTest {
- Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class);
+ private Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class);
+
+ @Before
+ public void setup() {
+ LogLog.setInternalDebugging(true);
+ }
@Test
public void testKafkaLog4jConfigs() {
@@ -66,22 +79,103 @@ public class KafkaLog4jAppenderTest {
@Test
- public void testLog4jAppends() throws UnsupportedEncodingException {
- PropertyConfigurator.configure(getLog4jConfig());
+ public void testLog4jAppends() {
+ PropertyConfigurator.configure(getLog4jConfig(false));
for (int i = 1; i <= 5; ++i) {
logger.error(getMessage(i));
}
Assert.assertEquals(
- 5, ((MockKafkaLog4jAppender)
(logger.getRootLogger().getAppender("KAFKA"))).getHistory().size());
+ 5, (getMockKafkaLog4jAppender()).getHistory().size());
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void
testLog4jAppendsWithSyncSendAndSimulateProducerFailShouldThrowException() {
+ Properties props = getLog4jConfig(true);
+ props.put("log4j.appender.KAFKA.IgnoreExceptions", "false");
+ PropertyConfigurator.configure(props);
+
+ MockKafkaLog4jAppender mockKafkaLog4jAppender =
getMockKafkaLog4jAppender();
+ replaceProducerWithMocked(mockKafkaLog4jAppender, false);
+
+ logger.error(getMessage(0));
+ }
+
+ @Test
+ public void
testLog4jAppendsWithSyncSendWithoutIgnoringExceptionsShouldNotThrowException() {
+ Properties props = getLog4jConfig(true);
+ props.put("log4j.appender.KAFKA.IgnoreExceptions", "false");
+ PropertyConfigurator.configure(props);
+
+ MockKafkaLog4jAppender mockKafkaLog4jAppender =
getMockKafkaLog4jAppender();
+ replaceProducerWithMocked(mockKafkaLog4jAppender, true);
+
+ logger.error(getMessage(0));
+ }
+
+ @Test
+ public void
testLog4jAppendsWithRealProducerConfigWithSyncSendShouldNotThrowException() {
+ Properties props = getLog4jConfigWithRealProducer(true);
+ PropertyConfigurator.configure(props);
+
+ logger.error(getMessage(0));
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void
testLog4jAppendsWithRealProducerConfigWithSyncSendAndNotIgnoringExceptionsShouldThrowException()
{
+ Properties props = getLog4jConfigWithRealProducer(false);
+ PropertyConfigurator.configure(props);
+
+ logger.error(getMessage(0));
}
- private byte[] getMessage(int i) throws UnsupportedEncodingException {
- return ("test_" + i).getBytes("UTF-8");
+ private void replaceProducerWithMocked(MockKafkaLog4jAppender
mockKafkaLog4jAppender, boolean success) {
+ @SuppressWarnings("unchecked")
+ MockProducer<byte[], byte[]> producer =
EasyMock.niceMock(MockProducer.class);
+ @SuppressWarnings("unchecked")
+ Future<RecordMetadata> futureMock = EasyMock.niceMock(Future.class);
+ try {
+ if (!success)
+ EasyMock.expect(futureMock.get())
+ .andThrow(new ExecutionException("simulated timeout", new
TimeoutException()));
+ } catch (InterruptedException | ExecutionException e) {
+ // just mocking
+ }
+
EasyMock.expect(producer.send(EasyMock.anyObject())).andReturn(futureMock);
+ EasyMock.replay(producer, futureMock);
+ // reconfiguring mock appender
+ mockKafkaLog4jAppender.setKafkaProducer(producer);
+ mockKafkaLog4jAppender.activateOptions();
+ }
+
+ private MockKafkaLog4jAppender getMockKafkaLog4jAppender() {
+ return (MockKafkaLog4jAppender)
Logger.getRootLogger().getAppender("KAFKA");
+ }
+
+ private byte[] getMessage(int i) {
+ return ("test_" + i).getBytes(StandardCharsets.UTF_8);
+ }
+
+ private Properties getLog4jConfigWithRealProducer(boolean
ignoreExceptions) {
+ Properties props = new Properties();
+ props.put("log4j.rootLogger", "INFO, KAFKA");
+ props.put("log4j.appender.KAFKA",
"org.apache.kafka.log4jappender.KafkaLog4jAppender");
+ props.put("log4j.appender.KAFKA.layout",
"org.apache.log4j.PatternLayout");
+ props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c -
%m%n");
+ props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.2:9093");
+ props.put("log4j.appender.KAFKA.Topic", "test-topic");
+ props.put("log4j.appender.KAFKA.RequiredNumAcks", "1");
+ props.put("log4j.appender.KAFKA.SyncSend", "true");
+ // setting producer timeout (max.block.ms) to be low
+ props.put("log4j.appender.KAFKA.maxBlockMs", "10");
+ // ignoring exceptions
+ props.put("log4j.appender.KAFKA.IgnoreExceptions",
Boolean.toString(ignoreExceptions));
+ props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
+ return props;
}
- private Properties getLog4jConfig() {
+ private Properties getLog4jConfig(boolean syncSend) {
Properties props = new Properties();
props.put("log4j.rootLogger", "INFO, KAFKA");
props.put("log4j.appender.KAFKA",
"org.apache.kafka.log4jappender.MockKafkaLog4jAppender");
@@ -90,7 +184,7 @@ public class KafkaLog4jAppenderTest {
props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.1:9093");
props.put("log4j.appender.KAFKA.Topic", "test-topic");
props.put("log4j.appender.KAFKA.RequiredNumAcks", "1");
- props.put("log4j.appender.KAFKA.SyncSend", "false");
+ props.put("log4j.appender.KAFKA.SyncSend", Boolean.toString(syncSend));
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
return props;
}
diff --git
a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
index 8040be4..a9eb5fb 100644
---
a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
+++
b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
@@ -34,6 +34,10 @@ public class MockKafkaLog4jAppender extends
KafkaLog4jAppender {
return mockProducer;
}
+ void setKafkaProducer(MockProducer<byte[], byte[]> producer) {
+ this.mockProducer = producer;
+ }
+
@Override
protected void append(LoggingEvent event) {
if (super.getProducer() == null) {
@@ -42,7 +46,7 @@ public class MockKafkaLog4jAppender extends
KafkaLog4jAppender {
super.append(event);
}
- protected List<ProducerRecord<byte[], byte[]>> getHistory() {
+ List<ProducerRecord<byte[], byte[]>> getHistory() {
return mockProducer.history();
}
}