This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 973fa3a  Introduce a pulsar log4j2 appender (#1316)
973fa3a is described below

commit 973fa3ac35394e73ca7a950afac4f908633ff23c
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Fri Mar 2 13:07:21 2018 -0800

    Introduce a pulsar log4j2 appender (#1316)
    
    ### Motivation
    
    when developing `pulsar-functions`, it would be good to log the log 
messages generated by functions to pulsar, so the user can easily tail the log 
messages topic to get all the messages per functions.
    
    ### Modifications
    
    This PR is to add a pulsar based appender.
    
    ### Result
    
    We are able to use pulsar log4j2 appender to log messages.
---
 pom.xml                                            |  25 +++
 pulsar-log4j2-appender/pom.xml                     |  74 +++++++
 .../pulsar/log4j2/appender/PulsarAppender.java     | 211 ++++++++++++++++++++
 .../pulsar/log4j2/appender/PulsarManager.java      | 145 ++++++++++++++
 .../pulsar/log4j2/appender/PulsarAppenderTest.java | 218 +++++++++++++++++++++
 .../builder/ConfigurationAssemblerTest.java        | 115 +++++++++++
 .../appender/builder/ConfigurationBuilderTest.java | 120 ++++++++++++
 .../builder/CustomConfigurationFactory.java        |  88 +++++++++
 .../src/test/resources/PulsarAppenderTest.xml      |  49 +++++
 9 files changed, 1045 insertions(+)

diff --git a/pom.xml b/pom.xml
index ada1a71..c8131e6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,6 +99,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <module>all</module>
     <module>docker</module>
     <module>tests</module>
+    <module>pulsar-log4j2-appender</module>
   </modules>
 
   <issueManagement>
@@ -126,6 +127,9 @@ flexible messaging model and an intuitive client 
API.</description>
     <jackson.version>2.8.4</jackson.version>
     <puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
     <dockerfile-maven.version>1.3.7</dockerfile-maven.version>
+
+    <!-- test dependencies -->
+    <disruptor.version>3.4.0</disruptor.version>
   </properties>
 
   <dependencyManagement>
@@ -363,6 +367,19 @@ flexible messaging model and an intuitive client 
API.</description>
       </dependency>
 
       <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j-api</artifactId>
+        <type>test-jar</type>
+        <version>${log4j2.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j-core</artifactId>
+        <type>test-jar</type>
+        <version>${log4j2.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>commons-io</groupId>
         <artifactId>commons-io</artifactId>
         <version>2.5</version>
@@ -599,6 +616,14 @@ flexible messaging model and an intuitive client 
API.</description>
         <artifactId>bcpkix-jdk15on</artifactId>
         <version>${bouncycastle.version}</version>
       </dependency>
+
+      <!-- test dependencies -->
+      <dependency>
+        <groupId>com.lmax</groupId>
+        <artifactId>disruptor</artifactId>
+        <version>${disruptor.version}</version>
+      </dependency>
+
     </dependencies>
   </dependencyManagement>
 
diff --git a/pulsar-log4j2-appender/pom.xml b/pulsar-log4j2-appender/pom.xml
new file mode 100644
index 0000000..9352457
--- /dev/null
+++ b/pulsar-log4j2-appender/pom.xml
@@ -0,0 +1,74 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <groupId>org.apache.pulsar</groupId>
+               <artifactId>pulsar</artifactId>
+               <version>2.0.0-incubating-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>pulsar-log4j2-appender</artifactId>
+       <name>Pulsar Log4j2 Appender</name>
+       <description>Pulsar Log4j2 Appender</description>
+
+       <dependencies>
+               <dependency>
+                       <groupId>${project.groupId}</groupId>
+                       <artifactId>pulsar-client</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-api</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-core</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-slf4j-impl</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-api</artifactId>
+                       <type>test-jar</type>
+            <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-core</artifactId>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <!-- Required for AsyncLoggers -->
+               <dependency>
+                       <groupId>com.lmax</groupId>
+                       <artifactId>disruptor</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+</project>
diff --git 
a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
 
b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
new file mode 100644
index 0000000..68bfbd5
--- /dev/null
+++ 
b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.core.AbstractLifeCycle;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Node;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.layout.SerializedLayout;
+
+/**
+ * The PulsarAppender logs events to an Apache Pulsar topic.
+ *
+ * <p>Each log event is sent as a Pulsar record.
+ */
+@Plugin(
+    name = "Pulsar",
+    category = Node.CATEGORY,
+    elementType = Appender.ELEMENT_TYPE,
+    printObject = true)
+public final class PulsarAppender extends AbstractAppender {
+
+    /**
+     * Builds PulsarAppender instances.
+     * @param <B> The type to build
+     */
+    public static class Builder<B extends Builder<B>> extends 
AbstractAppender.Builder<B>
+            implements 
org.apache.logging.log4j.core.util.Builder<PulsarAppender> {
+
+        @PluginAttribute("key")
+        private String key;
+
+        @PluginAttribute("topic")
+        private String topic;
+
+        @PluginAttribute("serviceUrl")
+        private String serviceUrl;
+
+        @PluginAttribute(value = "avoidRecursive", defaultBoolean = true)
+        private boolean avoidRecursive;
+
+        @PluginAttribute(value = "syncSend", defaultBoolean = false)
+        private boolean syncSend;
+
+        @PluginElement("Properties")
+        private Property[] properties;
+
+        @SuppressWarnings("resource")
+        @Override
+        public PulsarAppender build() {
+            final Layout<? extends Serializable> layout = getLayout();
+            if (layout == null) {
+                AbstractLifeCycle.LOGGER.error("No layout provided for 
PulsarAppender");
+                return null;
+            }
+            PulsarManager manager = new PulsarManager(
+                getConfiguration().getLoggerContext(),
+                getName(),
+                serviceUrl,
+                topic,
+                syncSend,
+                properties,
+                key);
+            return new PulsarAppender(
+                getName(),
+                layout,
+                getFilter(),
+                isIgnoreExceptions(),
+                avoidRecursive,
+                manager);
+        }
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public boolean isSyncSend() {
+            return syncSend;
+        }
+
+        public Property[] getProperties() {
+            return properties;
+        }
+
+        public B setTopic(final String topic) {
+            this.topic = topic;
+            return asBuilder();
+        }
+
+        public B setSyncSend(final boolean syncSend) {
+            this.syncSend = syncSend;
+            return asBuilder();
+        }
+
+        public B setProperties(final Property[] properties) {
+            this.properties = properties;
+            return asBuilder();
+        }
+    }
+
+    /**
+     * Creates a builder for a PulsarAppender.
+     * @return a builder for a PulsarAppender.
+     */
+    @PluginBuilderFactory
+    public static <B extends Builder<B>> B newBuilder() {
+        return new Builder<B>().asBuilder();
+    }
+
+    private final boolean avoidRecursive;
+    private final PulsarManager manager;
+
+    private PulsarAppender(
+            final String name,
+            final Layout<? extends Serializable> layout,
+            final Filter filter,
+            final boolean ignoreExceptions,
+            final boolean avoidRecursive,
+            final PulsarManager manager) {
+        super(name, filter, layout, ignoreExceptions);
+        this.avoidRecursive = avoidRecursive;
+        this.manager = Objects.requireNonNull(manager, "manager");
+    }
+
+    @Override
+    public void append(final LogEvent event) {
+        if (avoidRecursive
+            && event.getLoggerName() != null
+            && event.getLoggerName().startsWith("org.apache.pulsar")) {
+            LOGGER.warn("Recursive logging from [{}] for appender [{}].", 
event.getLoggerName(), getName());
+        } else {
+            try {
+                tryAppend(event);
+            } catch (final Exception e) {
+                error("Unable to write to Pulsar in appender [" + getName() + 
"]", event, e);
+            }
+        }
+    }
+
+    private void tryAppend(final LogEvent event) {
+        final Layout<? extends Serializable> layout = getLayout();
+        byte[] data;
+        if (layout instanceof SerializedLayout) {
+            final byte[] header = layout.getHeader();
+            final byte[] body = layout.toByteArray(event);
+            data = new byte[header.length + body.length];
+            System.arraycopy(header, 0, data, 0, header.length);
+            System.arraycopy(body, 0, data, header.length, body.length);
+        } else {
+            data = layout.toByteArray(event);
+        }
+        manager.send(data);
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        try {
+            manager.startup();
+        } catch (Exception e) {
+            // fail to start the manager
+        }
+    }
+
+    @Override
+    public boolean stop(final long timeout, final TimeUnit timeUnit) {
+        setStopping();
+        boolean stopped = super.stop(timeout, timeUnit, false);
+        stopped &= manager.stop(timeout, timeUnit);
+        setStopped();
+        return stopped;
+    }
+
+    @Override
+    public String toString() {
+        return "PulsarAppender{" +
+            "name=" + getName() +
+            ", state=" + getState() +
+            ", serviceUrl=" + manager.getServiceUrl() +
+            ", topic=" + manager.getTopic() +
+            '}';
+    }
+
+}
diff --git 
a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java
 
b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java
new file mode 100644
index 0000000..c52faa6
--- /dev/null
+++ 
b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public class PulsarManager extends AbstractManager {
+
+    static Supplier<ClientBuilder> PULSAR_CLIENT_BUILDER = () -> 
PulsarClient.builder();
+
+    static BiFunction<String, byte[], Message<byte[]>> MESSAGE_BUILDER = (key, 
data) -> {
+        MessageBuilder<byte[]> messageBuilder = MessageBuilder.create()
+            .setContent(data);
+        if (null != key) {
+            messageBuilder = messageBuilder.setKey(key);
+        }
+        return messageBuilder.build();
+    };
+
+    private PulsarClient client;
+    private Producer<byte[]> producer;
+
+    private final String serviceUrl;
+    private final String topic;
+    private final String key;
+    private final boolean syncSend;
+
+    public PulsarManager(final LoggerContext loggerContext,
+                         final String name,
+                         final String serviceUrl,
+                         final String topic,
+                         final boolean syncSend,
+                         final Property[] properties,
+                         final String key) {
+        super(loggerContext, name);
+        this.serviceUrl = Objects.requireNonNull(serviceUrl, "serviceUrl");
+        this.topic = Objects.requireNonNull(topic, "topic");
+        this.syncSend = syncSend;
+        this.key = key;
+    }
+
+    @Override
+    public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
+        if (producer != null) {
+            try {
+                producer.closeAsync().get(timeout, timeUnit);
+            } catch (Exception e) {
+                // exceptions on closing
+                LOGGER.warn("Failed to close producer within {} milliseconds",
+                    timeUnit.toMillis(timeout), e);
+            }
+        }
+        return true;
+    }
+
+    public void send(final byte[] msg)  {
+        if (producer != null) {
+            String newKey = null;
+
+            if(key != null && key.contains("${")) {
+                newKey = 
getLoggerContext().getConfiguration().getStrSubstitutor().replace(key);
+            } else if (key != null) {
+                newKey = key;
+            }
+
+            Message<byte[]> message = MESSAGE_BUILDER.apply(newKey, msg);
+            if (syncSend) {
+                try {
+                    producer.send(message);
+                } catch (PulsarClientException e) {
+                    LOGGER.error("Unable to write to Pulsar in appender [" + 
getName() + "]", e);
+                }
+            } else {
+                producer.sendAsync(message)
+                    .exceptionally(cause -> {
+                        LOGGER.error("Unable to write to Pulsar in appender [" 
+ getName() + "]", cause);
+                        return null;
+                    });
+            }
+        }
+    }
+
+    public void startup() throws Exception {
+        try {
+            client = PULSAR_CLIENT_BUILDER.get()
+                .serviceUrl(serviceUrl)
+                .build();
+            ProducerBuilder<byte[]> producerBuilder = client.newProducer()
+                .topic(topic)
+                .producerName("pulsar-log4j2-appender-" + topic)
+                .blockIfQueueFull(false);
+            if (syncSend) {
+                // disable batching for sync send
+                producerBuilder = producerBuilder.enableBatching(false);
+            } else {
+                // enable batching in 10 ms for async send
+                producerBuilder = producerBuilder
+                    .enableBatching(true)
+                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS);
+            }
+            producer = producerBuilder.create();
+        } catch (Exception t) {
+            LOGGER.error("Failed to start pulsar manager {}", t);
+            throw t;
+        }
+    }
+
+    public String getServiceUrl() {
+        return serviceUrl;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+}
diff --git 
a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
 
b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
new file mode 100644
index 0000000..4431243
--- /dev/null
+++ 
b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.AssertJUnit.assertNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.message.SimpleMessage;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class PulsarAppenderTest {
+
+    private static final String LOG_MESSAGE = "Hello, world!";
+    private static final String TOPIC_NAME = "pulsar-topic";
+
+    private static Log4jLogEvent createLogEvent() {
+        return Log4jLogEvent.newBuilder()
+            .setLoggerName(PulsarAppenderTest.class.getName())
+            .setLoggerFqcn(PulsarAppenderTest.class.getName())
+            .setLevel(Level.INFO)
+            .setMessage(new SimpleMessage(LOG_MESSAGE))
+            .build();
+    }
+
+    private ClientBuilderImpl clientBuilder;
+    private PulsarClient client;
+    private Producer<byte[]> producer;
+    private List<Message<byte[]>> history;
+
+    private LoggerContext ctx;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        history = new LinkedList<>();
+
+        client = mock(PulsarClient.class);
+        producer = mock(Producer.class);
+        clientBuilder = mock(ClientBuilderImpl.class);
+
+        doReturn(client).when(clientBuilder).build();
+        doReturn(clientBuilder).when(clientBuilder).serviceUrl(anyString());
+
+        ProducerBuilder<byte[]> producerBuilder = mock(ProducerBuilder.class);
+        when(client.newProducer()).thenReturn(producerBuilder);
+        doReturn(producerBuilder).when(producerBuilder).topic(anyString());
+        
doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
+        
doReturn(producerBuilder).when(producerBuilder).enableBatching(anyBoolean());
+        
doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyInt(),
 any(TimeUnit.class));
+        
doReturn(producerBuilder).when(producerBuilder).blockIfQueueFull(anyBoolean());
+        doReturn(producer).when(producerBuilder).create();
+
+        when(producer.send(any(Message.class)))
+            .thenAnswer(invocationOnMock -> {
+                Message<byte[]> msg = invocationOnMock.getArgumentAt(0, 
Message.class);
+                synchronized (history) {
+                    history.add(msg);
+                }
+                return null;
+            });
+
+        when(producer.sendAsync(any(Message.class)))
+            .thenAnswer(invocationOnMock -> {
+                Message<byte[]> msg = invocationOnMock.getArgumentAt(0, 
Message.class);
+                synchronized (history) {
+                    history.add(msg);
+                }
+                CompletableFuture<MessageId> future = new 
CompletableFuture<>();
+                future.complete(mock(MessageId.class));
+                return future;
+            });
+
+        PulsarManager.PULSAR_CLIENT_BUILDER = () -> clientBuilder;
+        PulsarManager.MESSAGE_BUILDER = (key, data) -> {
+            Message<byte[]> msg = mock(Message.class);
+            when(msg.getKey()).thenReturn(key);
+            when(msg.getData()).thenReturn(data);
+            return msg;
+        };
+
+        ctx = Configurator.initialize(
+            "PulsarAppenderTest",
+            getClass().getClassLoader(),
+            
getClass().getClassLoader().getResource("PulsarAppenderTest.xml").toURI());
+    }
+
+    @Test
+    public void testAppendWithLayout() throws Exception {
+        final Appender appender = 
ctx.getConfiguration().getAppender("PulsarAppenderWithLayout");
+        appender.append(createLogEvent());
+        final Message<byte[]> item;
+        synchronized (history) {
+            assertEquals(1, history.size());
+            item = history.get(0);
+        }
+        assertNotNull(item);
+        assertNull(item.getKey());
+        assertEquals("[" + LOG_MESSAGE + "]", new String(item.getData(), 
StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testAppendWithSerializedLayout() throws Exception {
+        final Appender appender = 
ctx.getConfiguration().getAppender("PulsarAppenderWithSerializedLayout");
+        final LogEvent logEvent = createLogEvent();
+        appender.append(logEvent);
+        final Message<byte[]> item;
+        synchronized (history) {
+            assertEquals(1, history.size());
+            item = history.get(0);
+        }
+        assertNotNull(item);
+        assertNull(item.getKey());
+        assertEquals(LOG_MESSAGE, 
deserializeLogEvent(item.getData()).getMessage().getFormattedMessage());
+    }
+
+    @Test
+    public void testAsyncAppend() throws Exception {
+        final Appender appender = 
ctx.getConfiguration().getAppender("AsyncPulsarAppender");
+        appender.append(createLogEvent());
+        final Message<byte[]> item;
+        synchronized (history) {
+            assertEquals(1, history.size());
+            item = history.get(0);
+        }
+        assertNotNull(item);
+        assertNull(item.getKey());
+        assertEquals(LOG_MESSAGE, new String(item.getData(), 
StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testAppendWithKey() throws Exception {
+        final Appender appender = 
ctx.getConfiguration().getAppender("PulsarAppenderWithKey");
+        final LogEvent logEvent = createLogEvent();
+        appender.append(logEvent);
+        Message<byte[]> item;
+        synchronized (history) {
+            assertEquals(1, history.size());
+            item = history.get(0);
+        }
+        assertNotNull(item);
+        String msgKey = item.getKey();
+        assertEquals(msgKey, "key");
+        assertEquals(LOG_MESSAGE, new String(item.getData(), 
StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testAppendWithKeyLookup() throws Exception {
+        final Appender appender = 
ctx.getConfiguration().getAppender("PulsarAppenderWithKeyLookup");
+        final LogEvent logEvent = createLogEvent();
+        Date date = new Date();
+        SimpleDateFormat format = new SimpleDateFormat("dd-MM-yyyy");
+        appender.append(logEvent);
+        Message<byte[]> item;
+        synchronized (history) {
+            assertEquals(1, history.size());
+            item = history.get(0);
+        }
+        assertNotNull(item);
+        String keyValue = format.format(date);
+        assertEquals(item.getKey(), keyValue);
+        assertEquals(LOG_MESSAGE, new String(item.getData(), 
StandardCharsets.UTF_8));
+    }
+
+    private LogEvent deserializeLogEvent(final byte[] data) throws 
IOException, ClassNotFoundException {
+        final ByteArrayInputStream bis = new ByteArrayInputStream(data);
+        try (ObjectInput ois = new ObjectInputStream(bis)) {
+            return (LogEvent) ois.readObject();
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java
 
b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java
new file mode 100644
index 0000000..a234741
--- /dev/null
+++ 
b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender.builder;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.LifeCycle;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.ConfigurationFactory;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.config.CustomLevelConfig;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
+import 
org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+import org.apache.logging.log4j.core.filter.ThresholdFilter;
+import org.apache.logging.log4j.core.layout.GelfLayout;
+import org.apache.logging.log4j.core.util.Constants;
+import org.apache.pulsar.log4j2.appender.PulsarAppender;
+import org.testng.annotations.Test;
+
+public class ConfigurationAssemblerTest {
+
+    @Test
+    public void testBuildConfiguration() throws Exception {
+        try {
+            System.setProperty(Constants.LOG4J_CONTEXT_SELECTOR,
+                    
"org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
+            final ConfigurationBuilder<BuiltConfiguration> builder = 
ConfigurationBuilderFactory
+                    .newConfigurationBuilder();
+            CustomConfigurationFactory.addTestFixtures("config name", builder);
+            final Configuration configuration = builder.build();
+            try (LoggerContext ctx = Configurator.initialize(configuration)) {
+                validate(configuration);
+            }
+        } finally {
+            System.getProperties().remove(Constants.LOG4J_CONTEXT_SELECTOR);
+        }
+    }
+
+    @Test
+    public void testCustomConfigurationFactory() throws Exception {
+        try {
+            
System.setProperty(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY,
+                    
"org.apache.pulsar.log4j2.appender.builder.CustomConfigurationFactory");
+            System.setProperty(Constants.LOG4J_CONTEXT_SELECTOR,
+                    
"org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
+            final Configuration config = ((LoggerContext) 
LogManager.getContext(false)).getConfiguration();
+            validate(config);
+        } finally {
+            System.getProperties().remove(Constants.LOG4J_CONTEXT_SELECTOR);
+            
System.getProperties().remove(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY);
+        }
+    }
+
+    private void validate(final Configuration config) {
+        assertNotNull(config);
+        assertNotNull(config.getName());
+        assertFalse(config.getName().isEmpty());
+        assertNotNull(config, "No configuration created");
+        assertEquals("Incorrect State: " + config.getState(), 
config.getState(), LifeCycle.State.STARTED);
+        final Map<String, Appender> appenders = config.getAppenders();
+        assertNotNull(appenders);
+        assertTrue("Incorrect number of Appenders: " + appenders.size(), 
appenders.size() == 2);
+        final PulsarAppender pulsarAppender = (PulsarAppender) 
appenders.get("Pulsar");
+        final GelfLayout gelfLayout = (GelfLayout) pulsarAppender.getLayout();
+        final Map<String, LoggerConfig> loggers = config.getLoggers();
+        assertNotNull(loggers);
+        assertTrue("Incorrect number of LoggerConfigs: " + loggers.size(), 
loggers.size() == 2);
+        final LoggerConfig rootLoggerConfig = loggers.get("");
+        assertEquals(Level.ERROR, rootLoggerConfig.getLevel());
+        assertFalse(rootLoggerConfig.isIncludeLocation());
+        final LoggerConfig loggerConfig = 
loggers.get("org.apache.logging.log4j");
+        assertEquals(Level.DEBUG, loggerConfig.getLevel());
+        assertTrue(loggerConfig.isIncludeLocation());
+        final Filter filter = config.getFilter();
+        assertNotNull(filter, "No Filter");
+        assertTrue("Not a Threshold Filter", filter instanceof 
ThresholdFilter);
+        final List<CustomLevelConfig> customLevels = config.getCustomLevels();
+        assertNotNull(filter, "No CustomLevels");
+        assertEquals(1, customLevels.size());
+        final CustomLevelConfig customLevel = customLevels.get(0);
+        assertEquals("Panic", customLevel.getLevelName());
+        assertEquals(17, customLevel.getIntLevel());
+        final Logger logger = LogManager.getLogger(getClass());
+        logger.info("Welcome to Log4j!");
+    }
+}
diff --git 
a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java
 
b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java
new file mode 100644
index 0000000..7de8a3f
--- /dev/null
+++ 
b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender.builder;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.appender.ConsoleAppender;
+import 
org.apache.logging.log4j.core.config.builder.api.AppenderComponentBuilder;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
+import 
org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+import org.testng.annotations.Test;
+
+public class ConfigurationBuilderTest {
+
+    private static final String INDENT = "  ";
+    private static final String EOL = System.lineSeparator();
+
+    private void addTestFixtures(final String name, final 
ConfigurationBuilder<BuiltConfiguration> builder) {
+        builder.setConfigurationName(name);
+        builder.setStatusLevel(Level.ERROR);
+        builder.setShutdownTimeout(5000, TimeUnit.MILLISECONDS);
+        
builder.add(builder.newScriptFile("target/test-classes/scripts/filter.groovy").addIsWatched(true));
+        builder.add(builder.newFilter("ThresholdFilter", Filter.Result.ACCEPT, 
Filter.Result.NEUTRAL)
+                .addAttribute("level", Level.DEBUG));
+
+        final AppenderComponentBuilder appenderBuilder = 
builder.newAppender("Stdout", "CONSOLE").addAttribute("target", 
ConsoleAppender.Target.SYSTEM_OUT);
+        appenderBuilder.add(builder.newLayout("PatternLayout").
+                addAttribute("pattern", "%d [%t] %-5level: %msg%n%throwable"));
+        appenderBuilder.add(builder.newFilter("MarkerFilter", 
Filter.Result.DENY,
+                Filter.Result.NEUTRAL).addAttribute("marker", "FLOW"));
+        builder.add(appenderBuilder);
+
+        final AppenderComponentBuilder appenderBuilder2 = 
builder.newAppender("Pulsar", "Pulsar")
+            .addAttribute("serviceUrl", "pulsar://localhost:6650")
+            .addAttribute("topic", "my-topic");
+        appenderBuilder2.add(builder.newLayout("GelfLayout").
+            addAttribute("host", "my-host").
+            addComponent(builder.newKeyValuePair("extraField", "extraValue")));
+        builder.add(appenderBuilder2);
+
+        builder.add(builder.newLogger("org.apache.logging.log4j", Level.DEBUG, 
true).
+                    add(builder.newAppenderRef("Stdout")).
+                    addAttribute("additivity", false));
+        builder.add(builder.newLogger("org.apache.logging.log4j.core", 
Level.DEBUG).
+                    add(builder.newAppenderRef("Stdout")));
+        
builder.add(builder.newRootLogger(Level.ERROR).add(builder.newAppenderRef("Stdout")));
+
+        builder.addProperty("MyKey", "MyValue");
+        builder.add(builder.newCustomLevel("Panic", 17));
+        builder.setPackages("foo,bar");
+    }
+
+    private final static String expectedXml =
+            "<?xml version=\"1.0\" ?>" + EOL +
+            "<Configuration name=\"config name\" status=\"ERROR\" 
packages=\"foo,bar\" shutdownTimeout=\"5000\">" + EOL +
+                INDENT + "<Properties>" + EOL +
+                INDENT + INDENT + "<Property 
name=\"MyKey\">MyValue</Property>" + EOL +
+                INDENT + "</Properties>" + EOL +
+                INDENT + "<Scripts>" + EOL +
+                INDENT + INDENT + "<ScriptFile 
name=\"target/test-classes/scripts/filter.groovy\" 
path=\"target/test-classes/scripts/filter.groovy\" isWatched=\"true\"/>" + EOL +
+                INDENT + "</Scripts>" + EOL +
+                INDENT + "<CustomLevels>" + EOL +
+                INDENT + INDENT + "<CustomLevel name=\"Panic\" 
intLevel=\"17\"/>" + EOL +
+                INDENT + "</CustomLevels>" + EOL +
+                INDENT + "<ThresholdFilter onMatch=\"ACCEPT\" 
onMisMatch=\"NEUTRAL\" level=\"DEBUG\"/>" + EOL +
+                INDENT + "<Appenders>" + EOL +
+                INDENT + INDENT + "<CONSOLE name=\"Stdout\" 
target=\"SYSTEM_OUT\">" + EOL +
+                INDENT + INDENT + INDENT + "<PatternLayout pattern=\"%d [%t] 
%-5level: %msg%n%throwable\"/>" + EOL +
+                INDENT + INDENT + INDENT + "<MarkerFilter onMatch=\"DENY\" 
onMisMatch=\"NEUTRAL\" marker=\"FLOW\"/>" + EOL +
+                INDENT + INDENT + "</CONSOLE>" + EOL +
+                INDENT + INDENT + "<Pulsar name=\"Pulsar\" 
serviceUrl=\"pulsar://localhost:6650\" topic=\"my-topic\">" + EOL +
+                INDENT + INDENT + INDENT + "<GelfLayout host=\"my-host\">" + 
EOL +
+                INDENT + INDENT + INDENT + INDENT + "<KeyValuePair 
key=\"extraField\" value=\"extraValue\"/>" + EOL +
+                INDENT + INDENT + INDENT + "</GelfLayout>" + EOL +
+                INDENT + INDENT + "</Pulsar>" + EOL +
+                INDENT + "</Appenders>" + EOL +
+                INDENT + "<Loggers>" + EOL +
+                INDENT + INDENT + "<Logger name=\"org.apache.logging.log4j\" 
level=\"DEBUG\" includeLocation=\"true\" additivity=\"false\">" + EOL +
+                INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + 
EOL +
+                INDENT + INDENT + "</Logger>" + EOL +
+                INDENT + INDENT + "<Logger 
name=\"org.apache.logging.log4j.core\" level=\"DEBUG\">" + EOL +
+                INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + 
EOL +
+                INDENT + INDENT + "</Logger>" + EOL +
+                INDENT + INDENT + "<Root level=\"ERROR\">" + EOL +
+                INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + 
EOL +
+                INDENT + INDENT + "</Root>" + EOL +
+                INDENT + "</Loggers>" + EOL +
+            "</Configuration>" + EOL;
+
+    // TODO make test run properly on Windows
+    @Test
+    public void testXmlConstructing() throws Exception {
+        //assumeTrue(System.lineSeparator().length() == 1); // Only run test 
on platforms with single character line endings (such as Linux), not on Windows
+        final ConfigurationBuilder<BuiltConfiguration> builder = 
ConfigurationBuilderFactory.newConfigurationBuilder();
+        addTestFixtures("config name", builder);
+        final String xmlConfiguration = builder.toXmlConfiguration();
+        assertEquals(expectedXml, xmlConfiguration);
+    }
+
+}
diff --git 
a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java
 
b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java
new file mode 100644
index 0000000..98b26d0
--- /dev/null
+++ 
b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender.builder;
+
+import java.net.URI;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.ConsoleAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.ConfigurationFactory;
+import org.apache.logging.log4j.core.config.ConfigurationSource;
+import 
org.apache.logging.log4j.core.config.builder.api.AppenderComponentBuilder;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+
+/**
+ * Normally this would be a plugin. However, we don't want it used for 
everything so it will be defined
+ * via a system property.
+ */
+//@Plugin(name = "CustomConfigurationFactory", category = 
ConfigurationFactory.CATEGORY)
+//@Order(50)
+public class CustomConfigurationFactory extends ConfigurationFactory {
+
+    static Configuration addTestFixtures(final String name, final 
ConfigurationBuilder<BuiltConfiguration> builder) {
+        builder.setConfigurationName(name);
+        builder.setStatusLevel(Level.ERROR);
+        
builder.add(builder.newScriptFile("target/test-classes/scripts/filter.groovy").addIsWatched(true));
+        builder.add(builder.newFilter("ThresholdFilter", Filter.Result.ACCEPT, 
Filter.Result.NEUTRAL)
+                .addAttribute("level", Level.DEBUG));
+
+        final AppenderComponentBuilder appenderBuilder = 
builder.newAppender("Stdout", "CONSOLE").addAttribute("target", 
ConsoleAppender.Target.SYSTEM_OUT);
+        appenderBuilder.add(builder.newLayout("PatternLayout").
+                addAttribute("pattern", "%d [%t] %-5level: %msg%n%throwable"));
+        appenderBuilder.add(builder.newFilter("MarkerFilter", 
Filter.Result.DENY,
+                Filter.Result.NEUTRAL).addAttribute("marker", "FLOW"));
+        builder.add(appenderBuilder);
+
+        final AppenderComponentBuilder appenderBuilder2 = 
builder.newAppender("Pulsar", "Pulsar")
+            .addAttribute("serviceUrl", "pulsar://localhost:6650")
+            .addAttribute("topic", "my-topic");
+        appenderBuilder2.add(builder.newLayout("GelfLayout").
+            addAttribute("host", "my-host").
+            addComponent(builder.newKeyValuePair("extraField", "extraValue")));
+        builder.add(appenderBuilder2);
+
+        builder.add(builder.newLogger("org.apache.logging.log4j", Level.DEBUG, 
true).
+                    add(builder.newAppenderRef("Stdout")).
+                    addAttribute("additivity", false));
+        
builder.add(builder.newRootLogger(Level.ERROR).add(builder.newAppenderRef("Stdout")));
+
+        builder.add(builder.newCustomLevel("Panic", 17));
+
+        return builder.build();
+    }
+
+    @Override
+    public Configuration getConfiguration(final LoggerContext loggerContext, 
final ConfigurationSource source) {
+        return getConfiguration(loggerContext, source.toString(), null);
+    }
+
+    @Override
+    public Configuration getConfiguration(final LoggerContext loggerContext, 
final String name, final URI configLocation) {
+        final ConfigurationBuilder<BuiltConfiguration> builder = 
newConfigurationBuilder();
+        return addTestFixtures(name, builder);
+    }
+
+    @Override
+    protected String[] getSupportedTypes() {
+        return new String[] {"*"};
+    }
+}
diff --git a/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml 
b/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml
new file mode 100644
index 0000000..a5743fb
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<Configuration name="PulsarAppenderTest" status="OFF">
+  <Appenders>
+    <Pulsar name="PulsarAppenderWithLayout" 
serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" 
avoidRecursive="false">
+      <PatternLayout pattern="[%m]"/>
+    </Pulsar>
+    <Pulsar name="PulsarAppenderWithSerializedLayout" 
serviceUrl="pulsar://localhost:6650"  topic="persistent://t/c/n/pulsar-topic" 
avoidRecursive="false">
+      <SerializedLayout/>
+    </Pulsar>
+    <Pulsar name="AsyncPulsarAppender" serviceUrl="pulsar://localhost:6650" 
topic="persistent://t/c/n/pulsar-topic" avoidRecursive="false">
+      <PatternLayout pattern="%m"/>
+      <Property name="syncSend">false</Property>
+    </Pulsar>
+    <Pulsar name="PulsarAppenderWithKey" serviceUrl="pulsar://localhost:6650" 
topic="persistent://t/c/n/pulsar-topic" key="key" avoidRecursive="false">
+      <PatternLayout pattern="%m"/>
+    </Pulsar>
+    <Pulsar name="PulsarAppenderWithKeyLookup" 
serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" 
key="$${date:dd-MM-yyyy}" avoidRecursive="false">
+      <PatternLayout pattern="%m"/>
+    </Pulsar>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="PulsarAppenderWithLayout"/>
+      <AppenderRef ref="PulsarAppenderWithSerializedLayout"/>
+      <AppenderRef ref="AsyncPulsarAppender"/>
+      <AppenderRef ref="PulsarAppenderWithKey"/>
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to