Repository: logging-log4j2 Updated Branches: refs/heads/master ce1668592 -> 80c9dcbea
[LOG4J2-1113] New publisher Appender for ZeroMQ (using JeroMQ). Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/80c9dcbe Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/80c9dcbe Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/80c9dcbe Branch: refs/heads/master Commit: 80c9dcbeaed4976649ad5398fff2d3b1540d9cb7 Parents: ce16685 Author: ggregory <[email protected]> Authored: Wed Sep 9 16:01:27 2015 -0700 Committer: ggregory <[email protected]> Committed: Wed Sep 9 16:01:27 2015 -0700 ---------------------------------------------------------------------- log4j-core/pom.xml | 6 + .../appender/mom/jeromq/JeroMqAppender.java | 350 +++++++++++++++++++ .../appender/mom/jeromq/JeroMqAppenderTest.java | 130 +++++++ .../appender/mom/jeromq/JeroMqTestClient.java | 55 +++ .../logging/log4j/junit/LoggerContextRule.java | 25 ++ .../src/test/resources/JeroMqAppenderTest.xml | 30 ++ pom.xml | 5 + src/changes/changes.xml | 3 + src/site/site.xml | 1 + src/site/xdoc/manual/appenders.xml | 161 +++++++++ src/site/xdoc/runtime-dependencies.xml | 8 + 11 files changed, 774 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/pom.xml ---------------------------------------------------------------------- diff --git a/log4j-core/pom.xml b/log4j-core/pom.xml index 1c00fbc..f8193e8 100644 --- a/log4j-core/pom.xml +++ b/log4j-core/pom.xml @@ -114,6 +114,12 @@ <artifactId>kafka-clients</artifactId> <optional>true</optional> </dependency> + <!-- Used for ZeroMQ JeroMQ appender --> + <dependency> + <groupId>org.zeromq</groupId> + <artifactId>jeromq</artifactId> + <optional>true</optional> + </dependency> <!-- Used for compressing to formats other than zip and gz --> <dependency> <groupId>org.apache.commons</groupId> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java new file mode 100644 index 0000000..90ed8f5 --- /dev/null +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.apache.logging.log4j.core.appender.mom.jeromq; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginElement; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; +import org.apache.logging.log4j.core.layout.PatternLayout; +import org.apache.logging.log4j.status.StatusLogger; +import org.apache.logging.log4j.util.PropertiesUtil; +import org.apache.logging.log4j.util.Strings; +import org.zeromq.ZMQ; +import org.zeromq.ZMQ.Socket; + +/** + * Sends log events to one or more ZeroMQ (JeroMQ) endpoints. + * <p> + * Requires the JeroMQ jar (LGPL as of 0.3.5) + * </p> + */ +// TODO +// Some methods are synchronized because a ZMQ.Socket is not thread-safe +// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be +// some issue on threads owning certain resources as opposed to others. +@Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true) +public final class JeroMqAppender extends AbstractAppender { + + // Per ZMQ docs, there should usually only be one ZMQ context per process. + private static volatile ZMQ.Context context; + + private static Logger logger; + + // ZMQ sockets are not thread safe. + private static ZMQ.Socket publisher; + + private static final long serialVersionUID = 1L; + + private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName(); + + static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook"; + + static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads"; + + static { + logger = StatusLogger.getLogger(); + final PropertiesUtil managerProps = PropertiesUtil.getProperties(); + final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1); + final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true); + final String simpleName = SIMPLE_NAME; + logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString()); + logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads); + context = ZMQ.context(ioThreads); + logger.trace("{} created ZMQ context {}", simpleName, context); + if (enableShutdownHook) { + final Thread hook = new Thread(simpleName + "-shutdown") { + @Override + public void run() { + shutdown(); + } + }; + logger.trace("{} adding shutdown hook {}", simpleName, hook); + Runtime.getRuntime().addShutdownHook(hook); + } + } + + // The ZMQ.Socket class has other set methods that we do not cover because + // they throw unsupported operation exceptions. + @PluginFactory + public static JeroMqAppender createAppender( + // @formatter:off + @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name, + @PluginElement("Layout") Layout<?> layout, + @PluginElement("Filters") final Filter filter, + @PluginElement("Properties") final Property[] properties, + // Super attributes + @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions, + // ZMQ attributes; defaults picked from zmq.Options. + @PluginAttribute(value="affinity", defaultLong=0) final long affinity, + @PluginAttribute(value="backlog", defaultLong=100) final long backlog, + @PluginAttribute(value="delayAttachOnConnect", defaultBoolean=false) final boolean delayAttachOnConnect, + @PluginAttribute(value="identity") final byte[] identity, + @PluginAttribute(value="ipv4Only", defaultBoolean=true) final boolean ipv4Only, + @PluginAttribute(value="linger", defaultLong=-1) final long linger, + @PluginAttribute(value="maxMsgSize", defaultLong=-1) final long maxMsgSize, + @PluginAttribute(value="rcvHwm", defaultLong=1000) final long rcvHwm, + @PluginAttribute(value="receiveBufferSize", defaultLong=0) final long receiveBufferSize, + @PluginAttribute(value="receiveTimeOut", defaultLong=-1) final int receiveTimeOut, + @PluginAttribute(value="reconnectIVL", defaultLong=100) final long reconnectIVL, + @PluginAttribute(value="reconnectIVLMax", defaultLong=0) final long reconnectIVLMax, + @PluginAttribute(value="sendBufferSize", defaultLong=0) final long sendBufferSize, + @PluginAttribute(value="sendTimeOut", defaultLong=-1) final int sendTimeOut, + @PluginAttribute(value="sndHwm", defaultLong=1000) final long sndHwm, + @PluginAttribute(value="tcpKeepAlive", defaultInt=-1) final int tcpKeepAlive, + @PluginAttribute(value="tcpKeepAliveCount", defaultLong=-1) final long tcpKeepAliveCount, + @PluginAttribute(value="tcpKeepAliveIdle", defaultLong=-1) final long tcpKeepAliveIdle, + @PluginAttribute(value="tcpKeepAliveInterval", defaultLong=-1) final long tcpKeepAliveInterval, + @PluginAttribute(value="xpubVerbose", defaultBoolean=false) final boolean xpubVerbose + // @formatter:on + ) { + if (layout == null) { + layout = PatternLayout.createDefaultLayout(); + } + List<String> endpoints; + if (properties == null) { + endpoints = new ArrayList<>(0); + } else { + endpoints = new ArrayList<>(properties.length); + for (final Property property : properties) { + if ("endpoint".equalsIgnoreCase(property.getName())) { + final String value = property.getValue(); + if (Strings.isNotEmpty(value)) { + endpoints.add(value); + } + } + } + } + logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}", + name, filter, layout, ignoreExceptions, endpoints); + return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog, + delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, + reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive, tcpKeepAliveCount, + tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose); + } + + static ZMQ.Context getContext() { + return context; + } + + private static ZMQ.Socket getPublisher() { + return publisher; + } + + private static ZMQ.Socket newPublisher() { + logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context); + final Socket socketPub = context.socket(ZMQ.PUB); + logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub); + return socketPub; + } + + static void shutdown() { + if (context != null) { + logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context); + context.term(); + context = null; + } + } + + private final long affinity; + private final long backlog; + private final boolean delayAttachOnConnect; + private final List<String> endpoints; + private final byte[] identity; + private final int ioThreads = 1; + private final boolean ipv4Only; + private final long linger; + private final long maxMsgSize; + private final long rcvHwm; + private final long receiveBufferSize; + private final int receiveTimeOut; + private final long reconnectIVL; + private final long reconnectIVLMax; + private final long sendBufferSize; + private int sendRcFalse; + private int sendRcTrue; + private final int sendTimeOut; + private final long sndHwm; + private final int tcpKeepAlive; + private final long tcpKeepAliveCount; + private final long tcpKeepAliveIdle; + private final long tcpKeepAliveInterval; + private final boolean xpubVerbose; + + private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, + final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog, + final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger, + final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut, + final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut, + final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, + final long tcpKeepAliveInterval, final boolean xpubVerbose) { + super(name, filter, layout, ignoreExceptions); + this.endpoints = endpoints; + this.affinity = affinity; + this.backlog = backlog; + this.delayAttachOnConnect = delayAttachOnConnect; + this.identity = identity; + this.ipv4Only = ipv4Only; + this.linger = linger; + this.maxMsgSize = maxMsgSize; + this.rcvHwm = rcvHwm; + this.receiveBufferSize = receiveBufferSize; + this.receiveTimeOut = receiveTimeOut; + this.reconnectIVL = reconnectIVL; + this.reconnectIVLMax = reconnectIVLMax; + this.sendBufferSize = sendBufferSize; + this.sendTimeOut = sendTimeOut; + this.sndHwm = sndHWM; + this.tcpKeepAlive = tcpKeepAlive; + this.tcpKeepAliveCount = tcpKeepAliveCount; + this.tcpKeepAliveIdle = tcpKeepAliveIdle; + this.tcpKeepAliveInterval = tcpKeepAliveInterval; + this.xpubVerbose = xpubVerbose; + } + + @Override + public synchronized void append(final LogEvent event) { + final String formattedMessage = event.getMessage().getFormattedMessage(); + if (getPublisher().send(formattedMessage, 0)) { + sendRcTrue++; + } else { + sendRcFalse++; + logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, + formattedMessage); + } + } + + // not public, handy for testing + int getSendRcFalse() { + return sendRcFalse; + } + + // not public, handy for testing + int getSendRcTrue() { + return sendRcTrue; + } + + // not public, handy for testing + void resetSendRcs() { + sendRcTrue = sendRcFalse = 0; + } + + @Override + public synchronized void start() { + super.start(); + publisher = newPublisher(); + final String name = getName(); + final String prefix = "JeroMQ Appender"; + logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString()); + logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads); + // + final ZMQ.Socket socketPub = getPublisher(); + logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name, + socketPub.getClass().getName(), socketPub); + logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity); + socketPub.setAffinity(affinity); + logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog); + socketPub.setBacklog(backlog); + logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect); + socketPub.setDelayAttachOnConnect(delayAttachOnConnect); + if (identity != null) { + logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity)); + socketPub.setIdentity(identity); + } + logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only); + socketPub.setIPv4Only(ipv4Only); + logger.trace("{} {} publisher setLinger({})", prefix, name, linger); + socketPub.setLinger(linger); + logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize); + socketPub.setMaxMsgSize(maxMsgSize); + logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm); + socketPub.setRcvHWM(rcvHwm); + logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize); + socketPub.setReceiveBufferSize(receiveBufferSize); + logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut); + socketPub.setReceiveTimeOut(receiveTimeOut); + logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL); + socketPub.setReconnectIVL(reconnectIVL); + logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax); + socketPub.setReconnectIVLMax(reconnectIVLMax); + logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize); + socketPub.setSendBufferSize(sendBufferSize); + logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut); + socketPub.setSendTimeOut(sendTimeOut); + logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm); + socketPub.setSndHWM(sndHwm); + logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive); + socketPub.setTCPKeepAlive(tcpKeepAlive); + logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount); + socketPub.setTCPKeepAliveCount(tcpKeepAliveCount); + logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle); + socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle); + logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval); + socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval); + logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose); + socketPub.setXpubVerbose(xpubVerbose); + // + if (logger.isDebugEnabled()) { + logger.debug( + "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, " + + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, " + + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}", + name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(), + socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(), + socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(), socketPub.getRate(), + socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(), socketPub.getReceiveTimeOut(), + socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(), socketPub.getRecoveryInterval(), + socketPub.getSendBufferSize(), socketPub.getSendTimeOut(), socketPub.getSndHWM(), + socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(), socketPub.getTCPKeepAliveIdle(), + socketPub.getTCPKeepAliveInterval(), socketPub.getTCPKeepAliveSetting()); + } + for (final String endpoint : endpoints) { + logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint); + socketPub.bind(endpoint); + } + } + + @Override + public synchronized void stop() { + super.stop(); + final Socket socketPub = getPublisher(); + if (socketPub != null) { + logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub); + socketPub.close(); + publisher = null; + } + } + + @Override + public String toString() { + return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java new file mode 100644 index 0000000..041419c --- /dev/null +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.apache.logging.log4j.core.appender.mom.jeromq; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.logging.log4j.core.Logger; +import org.apache.logging.log4j.junit.LoggerContextRule; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +public class JeroMqAppenderTest { + + @AfterClass + public static void tearDownClass() { + // JeroMqAppender.shutdown(); + } + + @ClassRule + public static LoggerContextRule ctx = new LoggerContextRule("JeroMqAppenderTest.xml"); + + @Test(timeout = 10000) + public void testAppenderLifeCycle() throws Exception { + // do nothing to make sure the appender starts and stops without + // locking up resources. + Assert.assertNotNull(JeroMqAppender.getContext()); + } + + @Test(timeout = 10000) + public void testClientServer() throws Exception { + final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class); + final int expectedReceiveCount = 2; + final JeroMqTestClient client = new JeroMqTestClient(JeroMqAppender.getContext(), "tcp://localhost:5556", expectedReceiveCount); + final ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + final Future<List<String>> future = executor.submit(client); + Thread.sleep(100); + final Logger logger = ctx.getLogger(getClass().getName()); + appender.resetSendRcs(); + logger.info("Hello"); + logger.info("Again"); + final List<String> list = future.get(); + Assert.assertEquals(expectedReceiveCount, appender.getSendRcTrue()); + Assert.assertEquals(0, appender.getSendRcFalse()); + Assert.assertEquals("Hello", list.get(0)); + Assert.assertEquals("Again", list.get(1)); + } finally { + executor.shutdown(); + } + } + + @Test(timeout = 10000) + public void testMultiThreadedServer() throws Exception { + final int nThreads = 10; + final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class); + final int expectedReceiveCount = 2 * nThreads; + final JeroMqTestClient client = new JeroMqTestClient(JeroMqAppender.getContext(), "tcp://localhost:5556", + expectedReceiveCount); + final ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + final Future<List<String>> future = executor.submit(client); + Thread.sleep(100); + final Logger logger = ctx.getLogger(getClass().getName()); + appender.resetSendRcs(); + final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(nThreads); + for (int i = 0; i < 10.; i++) { + fixedThreadPool.submit(new Runnable() { + @Override + public void run() { + logger.info("Hello"); + logger.info("Again"); + } + }); + } + final List<String> list = future.get(); + Assert.assertEquals(expectedReceiveCount, appender.getSendRcTrue()); + Assert.assertEquals(0, appender.getSendRcFalse()); + int hello = 0; + int again = 0; + for (final String string : list) { + switch (string) { + case "Hello": + hello++; + break; + case "Again": + again++; + break; + default: + Assert.fail("Unexpected message: " + string); + } + } + Assert.assertEquals(nThreads, hello); + Assert.assertEquals(nThreads, again); + } finally { + executor.shutdown(); + } + } + + @Test(timeout = 10000) + public void testServerOnly() throws Exception { + final Logger logger = ctx.getLogger(getClass().getName()); + final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class); + appender.resetSendRcs(); + logger.info("Hello"); + logger.info("Again"); + Assert.assertEquals(2, appender.getSendRcTrue()); + Assert.assertEquals(0, appender.getSendRcFalse()); + } +} http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java new file mode 100644 index 0000000..ddd06ab --- /dev/null +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqTestClient.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.apache.logging.log4j.core.appender.mom.jeromq; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; + +import org.zeromq.ZMQ; + +class JeroMqTestClient implements Callable<List<String>> { + + private final ZMQ.Context context; + + private final String endpoint; + private final List<String> messages; + private final int receiveCount; + + JeroMqTestClient(final ZMQ.Context context, final String endpoint, final int receiveCount) { + super(); + this.context = context; + this.endpoint = endpoint; + this.receiveCount = receiveCount; + this.messages = new ArrayList<>(receiveCount); + } + + @Override + public List<String> call() throws Exception { + try (ZMQ.Socket subscriber = context.socket(ZMQ.SUB)) { + subscriber.connect(endpoint); + subscriber.subscribe(new byte[0]); + for (int messageNum = 0; messageNum < receiveCount + && !Thread.currentThread().isInterrupted(); messageNum++) { + // Use trim to remove the tailing '0' character + messages.add(subscriber.recvStr(0).trim()); + } + } + return messages; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java b/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java index 65836d6..006d70c 100644 --- a/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java +++ b/log4j-core/src/test/java/org/apache/logging/log4j/junit/LoggerContextRule.java @@ -117,6 +117,17 @@ public class LoggerContextRule implements TestRule { } /** + * Gets a named Appender for this LoggerContext. + * @param <T> The target Appender class + * @param name the name of the Appender to look up. + * @param cls The target Appender class + * @return the named Appender or {@code null} if it wasn't defined in the configuration. + */ + public <T extends Appender> T getAppender(final String name, Class<T> cls) { + return cls.cast(getConfiguration().getAppenders().get(name)); + } + + /** * Gets a named Appender or throws an exception for this LoggerContext. * @param name the name of the Appender to look up. * @return the named Appender. @@ -129,6 +140,20 @@ public class LoggerContextRule implements TestRule { } /** + * Gets a named Appender or throws an exception for this LoggerContext. + * @param <T> The target Appender class + * @param name the name of the Appender to look up. + * @param cls The target Appender class + * @return the named Appender. + * @throws AssertionError if the Appender doesn't exist. + */ + public <T extends Appender> T getRequiredAppender(final String name, Class<T> cls) { + final T appender = getAppender(name, cls); + assertNotNull("Appender named " + name + " was null.", appender); + return appender; + } + + /** * Gets a named ListAppender or throws an exception for this LoggerContext. * @param name the name of the ListAppender to look up. * @return the named ListAppender. http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/log4j-core/src/test/resources/JeroMqAppenderTest.xml ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/resources/JeroMqAppenderTest.xml b/log4j-core/src/test/resources/JeroMqAppenderTest.xml new file mode 100644 index 0000000..119fc00 --- /dev/null +++ b/log4j-core/src/test/resources/JeroMqAppenderTest.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache license, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the license for the specific language governing permissions and + ~ limitations under the license. + --> +<Configuration name="JeroMQAppenderTest" status="TRACE"> + <Appenders> + <JeroMQ name="JeroMQAppender"> + <Property name="endpoint">tcp://*:5556</Property> + <Property name="endpoint">ipc://info-topic</Property> + </JeroMQ> + </Appenders> + <Loggers> + <Root level="info"> + <AppenderRef ref="JeroMQAppender"/> + </Root> + </Loggers> +</Configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 019b089..fbbb657 100644 --- a/pom.xml +++ b/pom.xml @@ -565,6 +565,11 @@ <version>0.8.2.1</version> </dependency> <dependency> + <groupId>org.zeromq</groupId> + <artifactId>jeromq</artifactId> + <version>0.3.5</version> + </dependency> + <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> <version>2.5</version> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/src/changes/changes.xml ---------------------------------------------------------------------- diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 5de063c..6d1d57e 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -40,6 +40,9 @@ <action issue="LOG4J2-1107" dev="ggregory" type="add" due-to="Mikael Ståldal"> New Appender for Apache Kafka. </action> + <action issue="LOG4J2-1113" dev="ggregory" type="add" due-to="Gary Gregory"> + New publisher Appender for ZeroMQ (using JeroMQ). + </action> <action issue="LOG4J2-1088" dev="ggregory" type="add" due-to="Gary Gregory"> Add Comma Separated Value (CSV) layouts for parameter and event logging. </action> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/src/site/site.xml ---------------------------------------------------------------------- diff --git a/src/site/site.xml b/src/site/site.xml index 3a2940d..7c82498 100644 --- a/src/site/site.xml +++ b/src/site/site.xml @@ -131,6 +131,7 @@ <item name="SMTP" href="/manual/appenders.html#SMTPAppender"/> <item name="Socket" href="/manual/appenders.html#SocketAppender"/> <item name="Syslog" href="/manual/appenders.html#SyslogAppender"/> + <item name="ZeroMQ" href="/manual/appenders.html#ZeroMQAppender"/> </item> <item name="Layouts" href="/manual/layouts.html" collapse="true"> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/src/site/xdoc/manual/appenders.xml ---------------------------------------------------------------------- diff --git a/src/site/xdoc/manual/appenders.xml b/src/site/xdoc/manual/appenders.xml index a99ad88..49f7dec 100644 --- a/src/site/xdoc/manual/appenders.xml +++ b/src/site/xdoc/manual/appenders.xml @@ -3288,6 +3288,167 @@ public class JpaLogEntity extends AbstractLogEventWrapperEntity { </Configuration>]]></pre> </subsection> + <a name="ZeroMQAppender"/> + <subsection name="ZeroMQ Appender"> + <p> + The ZeroMQ appender uses the <a href="https://github.com/zeromq/jeromq">JeroMQ</a> library to send log + events to one or more endpoints. + </p> + <p> + This is a simple JeroMQ configuration: + </p> + <pre class="prettyprint linenums"><![CDATA[<?xml version="1.0" encoding="UTF-8"?> +<Configuration name="JeroMQAppenderTest" status="TRACE"> + <Appenders> + <JeroMQ name="JeroMQAppender"> + <Property name="endpoint">tcp://*:5556</Property> + <Property name="endpoint">ipc://info-topic</Property> + </JeroMQ> + </Appenders> + <Loggers> + <Root level="info"> + <AppenderRef ref="JeroMQAppender"/> + </Root> + </Loggers> +</Configuration>]]></pre> + <p> + The table below describes all options. Please consult the JeroMQ and ZeroMQ documentation for details. + </p> + <table> + <caption align="top">JeroMQ Parameters</caption> + <tr> + <th>Parameter Name</th> + <th>Type</th> + <th>Description</th> + </tr> + <tr> + <td>name</td> + <td>String</td> + <td>The name of the Appender.</td> + </tr> + <tr> + <td>Layout</td> + <td>Layout</td> + <td>The Layout of the Appender.</td> + </tr> + <tr> + <td>Filters</td> + <td>Filter</td> + <td>The Filters of the Appender.</td> + </tr> + <tr> + <td>Property</td> + <td>Property</td> + <td>One or more Property elements, named <code>endpoint</code>.</td> + </tr> + <tr> + <td>ignoreExceptions</td> + <td>boolean</td> + <td>If true, exceptions will be logged and suppressed. If false errors will be logged and then passed to the application.</td> + </tr> + <tr> + <td>affinity</td> + <td>long</td> + <td>The ZMQ_AFFINITY option. Defaults to 0.</td> + </tr> + <tr> + <td>backlog</td> + <td>long</td> + <td>The ZMQ_BACKLOG option. Defaults to 100.</td> + </tr> + <tr> + <td>delayAttachOnConnect</td> + <td>boolean</td> + <td>The ZMQ_DELAY_ATTACH_ON_CONNECT option. Defaults to false.</td> + </tr> + <tr> + <td>identity</td> + <td>byte[]</td> + <td>The ZMQ_IDENTITY option. Defaults to none.</td> + </tr> + <tr> + <td>ipv4Only</td> + <td>boolean</td> + <td>The ZMQ_IPV4ONLY option. Defaults to true.</td> + </tr> + <tr> + <td>linger</td> + <td>long</td> + <td>The ZMQ_LINGER option. Defaults to -1.</td> + </tr> + <tr> + <td>maxMsgSize</td> + <td>long</td> + <td>The ZMQ_MAXMSGSIZE option. Defaults to -1.</td> + </tr> + <tr> + <td>rcvHwm</td> + <td>long</td> + <td>The ZMQ_RCVHWM option. Defaults to 1000.</td> + </tr> + <tr> + <td>receiveBufferSize</td> + <td>long</td> + <td>The ZMQ_RCVBUF option. Defaults to 0.</td> + </tr> + <tr> + <td>receiveTimeOut</td> + <td>int</td> + <td>The ZMQ_RCVTIMEO option. Defaults to -1.</td> + </tr> + <tr> + <td>reconnectIVL</td> + <td>long</td> + <td>The ZMQ_RECONNECT_IVL option. Defaults to 100.</td> + </tr> + <tr> + <td>reconnectIVLMax</td> + <td>long</td> + <td>The ZMQ_RECONNECT_IVL_MAX option. Defaults to 0.</td> + </tr> + <tr> + <td>sendBufferSize</td> + <td>long</td> + <td>The ZMQ_SNDBUF option. Defaults to 0.</td> + </tr> + <tr> + <td>sendTimeOut</td> + <td>int</td> + <td>The ZMQ_SNDTIMEO option. Defaults to -1.</td> + </tr> + <tr> + <td>sndHwm</td> + <td>long</td> + <td>The ZMQ_SNDHWM option. Defaults to 1000.</td> + </tr> + <tr> + <td>tcpKeepAlive</td> + <td>int</td> + <td>The ZMQ_TCP_KEEPALIVE option. Defaults to -1.</td> + </tr> + <tr> + <td>tcpKeepAliveCount</td> + <td>long</td> + <td>The ZMQ_TCP_KEEPALIVE_CNT option. Defaults to -1.</td> + </tr> + <tr> + <td>tcpKeepAliveIdle</td> + <td>long</td> + <td>The ZMQ_TCP_KEEPALIVE_IDLE option. Defaults to -1.</td> + </tr> + <tr> + <td>tcpKeepAliveInterval</td> + <td>long</td> + <td>The ZMQ_TCP_KEEPALIVE_INTVL option. Defaults to -1.</td> + </tr> + <tr> + <td>xpubVerbose</td> + <td>boolean</td> + <td>The ZMQ_XPUB_VERBOSE option. Defaults to false.</td> + </tr> + </table> + </subsection> + </section> </body> </document> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/80c9dcbe/src/site/xdoc/runtime-dependencies.xml ---------------------------------------------------------------------- diff --git a/src/site/xdoc/runtime-dependencies.xml b/src/site/xdoc/runtime-dependencies.xml index bbee62a..ace4141 100644 --- a/src/site/xdoc/runtime-dependencies.xml +++ b/src/site/xdoc/runtime-dependencies.xml @@ -100,6 +100,14 @@ In addition, XZ requires <a href="http://tukaani.org/xz/java.html">XZ for Java</a>. </td> </tr> + <tr> + <td>ZeroMQ Appender</td> + <td> + The ZeroMQ appender uses the <a href="https://github.com/zeromq/jeromq">JeroMQ</a> library which is + licensed under the terms of the GNU Lesser General Public License (LGPL). For details see the + files <code>COPYING</code> and <code>COPYING.LESSER</code> included with the JeroMQ distribution. + </td> + </tr> </table> <a name="log4j-jcl" />
