Improve JCTools usage in AsyncAppender This is from a patch from Anthony Maire (reformatted and final'd) on LOG4J2-1439.
* Adds configurable wait strategies. * Always keep some empty space in the queue to avoid the producer thread to suffer from false sharing when the queue is full. * Add a queue latency test to check performance without using AsyncAppender which is often hiding the queue performance issues. * Upgrade to JCTool 1.2.1. * Use spin wait strategy in perf tests to match other perf tests. Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/67d78505 Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/67d78505 Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/67d78505 Branch: refs/heads/master Commit: 67d78505afac27f3366fbc6939aacef92bc373c7 Parents: ac4f156 Author: Matt Sicker <boa...@gmail.com> Authored: Thu Jun 30 12:48:02 2016 -0500 Committer: Matt Sicker <boa...@gmail.com> Committed: Thu Jun 30 12:48:02 2016 -0500 ---------------------------------------------------------------------- .../core/async/JCToolsBlockingQueueFactory.java | 121 ++++++++++++++++--- .../core/async/perftest/AbstractRunQueue.java | 91 ++++++++++++++ .../core/async/perftest/ResponseTimeTest.java | 1 + .../core/async/perftest/RunConversant.java | 31 +++++ .../log4j/core/async/perftest/RunJCTools.java | 32 +++++ .../perf5AsyncApndMpscQNoLoc-noOpAppender.xml | 2 +- .../perf5AsyncApndMpscQWithLoc-noOpAppender.xml | 2 +- pom.xml | 2 +- 8 files changed, 263 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java index ccda263..9ae140d 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java @@ -7,6 +7,7 @@ import java.util.concurrent.locks.LockSupport; import org.apache.logging.log4j.core.config.Node; import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; import org.apache.logging.log4j.core.config.plugins.PluginFactory; import org.jctools.queues.MpscArrayQueue; @@ -18,17 +19,21 @@ import org.jctools.queues.MpscArrayQueue; @Plugin(name = "JCToolsBlockingQueue", category = Node.CATEGORY, elementType = BlockingQueueFactory.ELEMENT_TYPE) public class JCToolsBlockingQueueFactory<E> implements BlockingQueueFactory<E> { - private JCToolsBlockingQueueFactory() { + private final WaitStrategy waitStrategy; + + private JCToolsBlockingQueueFactory(final WaitStrategy waitStrategy) { + this.waitStrategy = waitStrategy; } @Override public BlockingQueue<E> create(final int capacity) { - return new MpscBlockingQueue<>(capacity); + return new MpscBlockingQueue<>(capacity, waitStrategy); } @PluginFactory - public static <E> JCToolsBlockingQueueFactory<E> createFactory() { - return new JCToolsBlockingQueueFactory<>(); + public static <E> JCToolsBlockingQueueFactory<E> createFactory( + @PluginAttribute(value = "WaitStrategy", defaultString = "PARK") final WaitStrategy waitStrategy) { + return new JCToolsBlockingQueueFactory<>(waitStrategy); } /** @@ -36,8 +41,11 @@ public class JCToolsBlockingQueueFactory<E> implements BlockingQueueFactory<E> { */ private static final class MpscBlockingQueue<E> extends MpscArrayQueue<E> implements BlockingQueue<E> { - MpscBlockingQueue(final int capacity) { + private final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy; + + MpscBlockingQueue(final int capacity, final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy) { super(capacity); + this.waitStrategy = waitStrategy; } @Override @@ -49,29 +57,59 @@ public class JCToolsBlockingQueueFactory<E> implements BlockingQueueFactory<E> { public int drainTo(final Collection<? super E> c, final int maxElements) { return drain(new Consumer<E>() { @Override - public void accept(E arg0) { - c.add(arg0); + public void accept(E e) { + c.add(e); } }, maxElements); } @Override public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { - // TODO Auto-generated method stub - return offer(e); + int idleCounter = 0; + final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout); + do { + if (offer(e)) { + return true; + } else if (System.nanoTime() - timeoutNanos > 0) { + return false; + } + idleCounter = waitStrategy.idle(idleCounter); + } while (!Thread.interrupted()); //clear interrupted flag + throw new InterruptedException(); } @Override public E poll(final long timeout, final TimeUnit unit) throws InterruptedException { - // TODO Auto-generated method stub - return poll(); + int idleCounter = 0; + final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout); + do { + E result = poll(); + if (result != null) { + return result; + } else if (System.nanoTime() - timeoutNanos > 0) { + return null; + } + idleCounter = waitStrategy.idle(idleCounter); + } while (!Thread.interrupted()); //clear interrupted flag + throw new InterruptedException(); } @Override public void put(final E e) throws InterruptedException { - while (!relaxedOffer(e)) { - LockSupport.parkNanos(1L); - } + int idleCounter = 0; + do { + if (offer(e)) { + return; + } + idleCounter = waitStrategy.idle(idleCounter); + } while (!Thread.interrupted()); //clear interrupted flag + throw new InterruptedException(); + } + + @Override + public boolean offer(final E e) { + //keep 2 cache lines empty to avoid false sharing that will slow the consumer thread when queue is full. + return offerIfBelowThreshold(e, capacity() - 32); } @Override @@ -81,13 +119,64 @@ public class JCToolsBlockingQueueFactory<E> implements BlockingQueueFactory<E> { @Override public E take() throws InterruptedException { - for (; ; ) { - final E result = poll(); + int idleCounter = 100; + do { + final E result = relaxedPoll(); if (result != null) { return result; } + idleCounter = waitStrategy.idle(idleCounter); + } while (!Thread.interrupted()); //clear interrupted flag + throw new InterruptedException(); + } + } + + public enum WaitStrategy { + SPIN(new Idle() { + @Override + public int idle(final int idleCounter) { + return idleCounter + 1; + } + }), + YIELD(new Idle() { + @Override + public int idle(final int idleCounter) { + Thread.yield(); + return idleCounter + 1; + } + }), + PARK(new Idle() { + @Override + public int idle(final int idleCounter) { LockSupport.parkNanos(1L); + return idleCounter + 1; } + }), + PROGRESSIVE(new Idle() { + @Override + public int idle(final int idleCounter) { + if (idleCounter > 200) { + LockSupport.parkNanos(1L); + } else if (idleCounter > 100) { + Thread.yield(); + } + return idleCounter + 1; + } + }); + + private final Idle idle; + + private int idle(final int idleCounter) { + return idle.idle(idleCounter); + } + + WaitStrategy(final Idle idle) { + this.idle = idle; } } + + private interface Idle { + int idle(int idleCounter); + } + } http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/AbstractRunQueue.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/AbstractRunQueue.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/AbstractRunQueue.java new file mode 100644 index 0000000..15ba61b --- /dev/null +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/AbstractRunQueue.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.core.async.perftest; + +import java.util.Objects; +import java.util.concurrent.BlockingQueue; + +import org.apache.logging.log4j.core.async.perftest.ResponseTimeTest.PrintingAsyncQueueFullPolicy; + +import com.lmax.disruptor.collections.Histogram; + +public abstract class AbstractRunQueue implements IPerfTestRunner { + + abstract BlockingQueue<String> createQueue(int capacity); + + private static final String STOP = "STOP_TEST"; + private volatile boolean stopped = false; + private final BlockingQueue<String> queue = createQueue(256 * 1024); + private final Thread backGroundThread; + + AbstractRunQueue() { + backGroundThread = new Thread(new Runnable() { + @Override + public void run() { + for (; ; ) { + try { + if (Objects.equals(queue.take(), STOP)) { + break; + } + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + } + }); + backGroundThread.start(); + } + + @Override + public void runThroughputTest(final int lines, final Histogram histogram) { + } + + + @Override + public void runLatencyTest(final int samples, final Histogram histogram, + final long nanoTimeCost, final int threadCount) { + } + + + @Override + public final void shutdown() { + stopped = true; + try { + queue.put(STOP); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + + @Override + public final void log(final String finalMessage) { + if (stopped) { + return; + } + if (!queue.offer(finalMessage)) { + PrintingAsyncQueueFullPolicy.ringbufferFull.incrementAndGet(); + try { + queue.put(finalMessage); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java index c09c428..93f0f2e 100644 --- a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java @@ -156,6 +156,7 @@ public class ResponseTimeTest { final int COUNT = (1000 * 1000) / threadCount; runLatencyTest(logger, TEST_DURATION_MILLIS, COUNT, loadMessagesPerSec, idleStrategy, serviceTmHistograms, responseTmHistograms, threadCount); + logger.shutdown(); final long end = System.currentTimeMillis(); // ... and report the results http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunConversant.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunConversant.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunConversant.java new file mode 100644 index 0000000..9b2adff --- /dev/null +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunConversant.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.core.async.perftest; + +import java.util.concurrent.BlockingQueue; + +import org.apache.logging.log4j.core.async.DisruptorBlockingQueueFactory; + +import com.conversantmedia.util.concurrent.SpinPolicy; + +public class RunConversant extends AbstractRunQueue { + + @Override + BlockingQueue<String> createQueue(int capacity) { + return DisruptorBlockingQueueFactory.<String>createFactory(SpinPolicy.SPINNING).create(capacity); + } +} http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunJCTools.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunJCTools.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunJCTools.java new file mode 100644 index 0000000..a5a632c --- /dev/null +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunJCTools.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.logging.log4j.core.async.perftest; + +import java.util.concurrent.BlockingQueue; + +import org.apache.logging.log4j.core.async.JCToolsBlockingQueueFactory; +import org.apache.logging.log4j.core.async.JCToolsBlockingQueueFactory.WaitStrategy; + +public class RunJCTools extends AbstractRunQueue { + + @Override + BlockingQueue<String> createQueue(int capacity) { + return JCToolsBlockingQueueFactory.<String>createFactory(WaitStrategy.SPIN).create(capacity); + } + + +} http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml ---------------------------------------------------------------------- diff --git a/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml b/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml index 26cd946..372c8bd 100644 --- a/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml +++ b/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml @@ -21,7 +21,7 @@ </CountingNoOp> <Async name="Async" blocking="true" bufferSize="262144"> <appender-ref ref="NoOp"/> - <JCToolsBlockingQueue/> + <JCToolsBlockingQueue waitStrategy="SPIN"/> </Async> </Appenders> <Loggers> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml ---------------------------------------------------------------------- diff --git a/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml b/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml index 3bf3c98..c2f6de0 100644 --- a/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml +++ b/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml @@ -21,7 +21,7 @@ </CountingNoOp> <Async name="Async" blocking="true" bufferSize="262144" includeLocation="true"> <appender-ref ref="NoOp"/> - <JCToolsBlockingQueue/> + <JCToolsBlockingQueue waitStrategy="SPIN"/> </Async> </Appenders> <Loggers> http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ec6bf85..b95be66 100644 --- a/pom.xml +++ b/pom.xml @@ -225,7 +225,7 @@ <activemq.version>5.13.2</activemq.version> <!-- Allow Clirr severity to be overriden by the command-line option -DminSeverity=level --> <minSeverity>info</minSeverity> - <jctoolsVersion>1.2</jctoolsVersion> + <jctoolsVersion>1.2.1</jctoolsVersion> </properties> <pluginRepositories> <pluginRepository>