This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c580f08  KAFKA-7134: KafkaLog4jAppender exception handling with 
ignoreExceptions (#5415)
c580f08 is described below

commit c580f08b7775e4431a4a1559ef7239b69708a782
Author: Andras Katona <[email protected]>
AuthorDate: Wed Aug 29 13:30:39 2018 +0200

    KAFKA-7134: KafkaLog4jAppender exception handling with ignoreExceptions 
(#5415)
    
    Reviewers: Andras Beni <[email protected]>, Sandor Murakozi 
<[email protected]>, Rajini Sivaram <[email protected]>
---
 build.gradle                                       |   1 +
 .../kafka/log4jappender/KafkaLog4jAppender.java    |  28 +++++-
 .../log4jappender/KafkaLog4jAppenderTest.java      | 112 +++++++++++++++++++--
 .../log4jappender/MockKafkaLog4jAppender.java      |   6 +-
 4 files changed, 135 insertions(+), 12 deletions(-)

diff --git a/build.gradle b/build.gradle
index 9b4610d..b13e948 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1235,6 +1235,7 @@ project(':log4j-appender') {
 
     testCompile project(':clients').sourceSets.test.output
     testCompile libs.junit
+    testCompile libs.easymock
   }
 
   javadoc {
diff --git 
a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
 
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
index 6ddaf92..0ae7dca 100644
--- 
a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
+++ 
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Future;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
 import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
 import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
@@ -64,10 +65,12 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
     private String saslKerberosServiceName;
     private String clientJaasConfPath;
     private String kerb5ConfPath;
+    private Integer maxBlockMs;
 
     private int retries = Integer.MAX_VALUE;
     private int requiredNumAcks = 1;
     private int deliveryTimeoutMs = 120000;
+    private boolean ignoreExceptions = true;
     private boolean syncSend;
     private Producer<byte[], byte[]> producer;
     
@@ -123,6 +126,14 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
         this.topic = topic;
     }
 
+    public boolean getIgnoreExceptions() {
+        return ignoreExceptions;
+    }
+
+    public void setIgnoreExceptions(boolean ignoreExceptions) {
+        this.ignoreExceptions = ignoreExceptions;
+    }
+
     public boolean getSyncSend() {
         return syncSend;
     }
@@ -203,6 +214,14 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
         return kerb5ConfPath;
     }
 
+    public int getMaxBlockMs() {
+        return maxBlockMs;
+    }
+
+    public void setMaxBlockMs(int maxBlockMs) {
+        this.maxBlockMs = maxBlockMs;
+    }
+
     @Override
     public void activateOptions() {
         // check for config parameter validity
@@ -242,6 +261,9 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
                 System.setProperty("java.security.krb5.conf", kerb5ConfPath);
             }
         }
+        if (maxBlockMs != null) {
+            props.put(MAX_BLOCK_MS_CONFIG, maxBlockMs);
+        }
 
         props.put(KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         props.put(VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
@@ -259,12 +281,14 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
         String message = subAppend(event);
         LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
         Future<RecordMetadata> response = producer.send(
-            new ProducerRecord<byte[], byte[]>(topic, 
message.getBytes(StandardCharsets.UTF_8)));
+            new ProducerRecord<>(topic, 
message.getBytes(StandardCharsets.UTF_8)));
         if (syncSend) {
             try {
                 response.get();
             } catch (InterruptedException | ExecutionException ex) {
-                throw new RuntimeException(ex);
+                if (!ignoreExceptions)
+                    throw new RuntimeException(ex);
+                LogLog.debug("Exception while getting response", ex);
             }
         }
     }
diff --git 
a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
 
b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
index 34be2e9..d5342e8 100644
--- 
a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
+++ 
b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
@@ -16,18 +16,31 @@
  */
 package org.apache.kafka.log4jappender;
 
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.helpers.LogLog;
+import org.easymock.EasyMock;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
 
 public class KafkaLog4jAppenderTest {
 
-    Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class);
+    private Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class);
+
+    @Before
+    public void setup() {
+        LogLog.setInternalDebugging(true);
+    }
 
     @Test
     public void testKafkaLog4jConfigs() {
@@ -66,22 +79,103 @@ public class KafkaLog4jAppenderTest {
 
 
     @Test
-    public void testLog4jAppends() throws UnsupportedEncodingException {
-        PropertyConfigurator.configure(getLog4jConfig());
+    public void testLog4jAppends() {
+        PropertyConfigurator.configure(getLog4jConfig(false));
 
         for (int i = 1; i <= 5; ++i) {
             logger.error(getMessage(i));
         }
 
         Assert.assertEquals(
-                5, ((MockKafkaLog4jAppender) 
(logger.getRootLogger().getAppender("KAFKA"))).getHistory().size());
+            5, (getMockKafkaLog4jAppender()).getHistory().size());
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void 
testLog4jAppendsWithSyncSendAndSimulateProducerFailShouldThrowException() {
+        Properties props = getLog4jConfig(true);
+        props.put("log4j.appender.KAFKA.IgnoreExceptions", "false");
+        PropertyConfigurator.configure(props);
+
+        MockKafkaLog4jAppender mockKafkaLog4jAppender = 
getMockKafkaLog4jAppender();
+        replaceProducerWithMocked(mockKafkaLog4jAppender, false);
+
+        logger.error(getMessage(0));
+    }
+
+    @Test
+    public void 
testLog4jAppendsWithSyncSendWithoutIgnoringExceptionsShouldNotThrowException() {
+        Properties props = getLog4jConfig(true);
+        props.put("log4j.appender.KAFKA.IgnoreExceptions", "false");
+        PropertyConfigurator.configure(props);
+
+        MockKafkaLog4jAppender mockKafkaLog4jAppender = 
getMockKafkaLog4jAppender();
+        replaceProducerWithMocked(mockKafkaLog4jAppender, true);
+
+        logger.error(getMessage(0));
+    }
+
+    @Test
+    public void 
testLog4jAppendsWithRealProducerConfigWithSyncSendShouldNotThrowException() {
+        Properties props = getLog4jConfigWithRealProducer(true);
+        PropertyConfigurator.configure(props);
+
+        logger.error(getMessage(0));
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void 
testLog4jAppendsWithRealProducerConfigWithSyncSendAndNotIgnoringExceptionsShouldThrowException()
 {
+        Properties props = getLog4jConfigWithRealProducer(false);
+        PropertyConfigurator.configure(props);
+
+        logger.error(getMessage(0));
     }
 
-    private byte[] getMessage(int i) throws UnsupportedEncodingException {
-        return ("test_" + i).getBytes("UTF-8");
+    private void replaceProducerWithMocked(MockKafkaLog4jAppender 
mockKafkaLog4jAppender, boolean success) {
+        @SuppressWarnings("unchecked")
+        MockProducer<byte[], byte[]> producer = 
EasyMock.niceMock(MockProducer.class);
+        @SuppressWarnings("unchecked")
+        Future<RecordMetadata> futureMock = EasyMock.niceMock(Future.class);
+        try {
+            if (!success)
+                EasyMock.expect(futureMock.get())
+                    .andThrow(new ExecutionException("simulated timeout", new 
TimeoutException()));
+        } catch (InterruptedException | ExecutionException e) {
+            // just mocking
+        }
+        
EasyMock.expect(producer.send(EasyMock.anyObject())).andReturn(futureMock);
+        EasyMock.replay(producer, futureMock);
+        // reconfiguring mock appender
+        mockKafkaLog4jAppender.setKafkaProducer(producer);
+        mockKafkaLog4jAppender.activateOptions();
+    }
+
+    private MockKafkaLog4jAppender getMockKafkaLog4jAppender() {
+        return (MockKafkaLog4jAppender) 
Logger.getRootLogger().getAppender("KAFKA");
+    }
+
+    private byte[] getMessage(int i) {
+        return ("test_" + i).getBytes(StandardCharsets.UTF_8);
+    }
+
+    private Properties getLog4jConfigWithRealProducer(boolean 
ignoreExceptions) {
+        Properties props = new Properties();
+        props.put("log4j.rootLogger", "INFO, KAFKA");
+        props.put("log4j.appender.KAFKA", 
"org.apache.kafka.log4jappender.KafkaLog4jAppender");
+        props.put("log4j.appender.KAFKA.layout", 
"org.apache.log4j.PatternLayout");
+        props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - 
%m%n");
+        props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.2:9093");
+        props.put("log4j.appender.KAFKA.Topic", "test-topic");
+        props.put("log4j.appender.KAFKA.RequiredNumAcks", "1");
+        props.put("log4j.appender.KAFKA.SyncSend", "true");
+        // setting producer timeout (max.block.ms) to be low
+        props.put("log4j.appender.KAFKA.maxBlockMs", "10");
+        // ignoring exceptions
+        props.put("log4j.appender.KAFKA.IgnoreExceptions", 
Boolean.toString(ignoreExceptions));
+        props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
+        return props;
     }
 
-    private Properties getLog4jConfig() {
+    private Properties getLog4jConfig(boolean syncSend) {
         Properties props = new Properties();
         props.put("log4j.rootLogger", "INFO, KAFKA");
         props.put("log4j.appender.KAFKA", 
"org.apache.kafka.log4jappender.MockKafkaLog4jAppender");
@@ -90,7 +184,7 @@ public class KafkaLog4jAppenderTest {
         props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.1:9093");
         props.put("log4j.appender.KAFKA.Topic", "test-topic");
         props.put("log4j.appender.KAFKA.RequiredNumAcks", "1");
-        props.put("log4j.appender.KAFKA.SyncSend", "false");
+        props.put("log4j.appender.KAFKA.SyncSend", Boolean.toString(syncSend));
         props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
         return props;
     }
diff --git 
a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
 
b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
index 8040be4..a9eb5fb 100644
--- 
a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
+++ 
b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
@@ -34,6 +34,10 @@ public class MockKafkaLog4jAppender extends 
KafkaLog4jAppender {
         return mockProducer;
     }
 
+    void setKafkaProducer(MockProducer<byte[], byte[]> producer) {
+        this.mockProducer = producer;
+    }
+
     @Override
     protected void append(LoggingEvent event) {
         if (super.getProducer() == null) {
@@ -42,7 +46,7 @@ public class MockKafkaLog4jAppender extends 
KafkaLog4jAppender {
         super.append(event);
     }
 
-    protected List<ProducerRecord<byte[], byte[]>> getHistory() {
+    List<ProducerRecord<byte[], byte[]>> getHistory() {
         return mockProducer.history();
     }
 }

Reply via email to