Improve JCTools usage in AsyncAppender

This is from a patch from Anthony Maire (reformatted and final'd) on 
LOG4J2-1439.

* Adds configurable wait strategies.
* Always keep some empty space in the queue to avoid the producer thread to 
suffer from false sharing when the queue is full.
* Add a queue latency test to check performance without using AsyncAppender 
which is often hiding the queue performance issues.
* Upgrade to JCTool 1.2.1.
* Use spin wait strategy in perf tests to match other perf tests.


Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/67d78505
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/67d78505
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/67d78505

Branch: refs/heads/master
Commit: 67d78505afac27f3366fbc6939aacef92bc373c7
Parents: ac4f156
Author: Matt Sicker <boa...@gmail.com>
Authored: Thu Jun 30 12:48:02 2016 -0500
Committer: Matt Sicker <boa...@gmail.com>
Committed: Thu Jun 30 12:48:02 2016 -0500

----------------------------------------------------------------------
 .../core/async/JCToolsBlockingQueueFactory.java | 121 ++++++++++++++++---
 .../core/async/perftest/AbstractRunQueue.java   |  91 ++++++++++++++
 .../core/async/perftest/ResponseTimeTest.java   |   1 +
 .../core/async/perftest/RunConversant.java      |  31 +++++
 .../log4j/core/async/perftest/RunJCTools.java   |  32 +++++
 .../perf5AsyncApndMpscQNoLoc-noOpAppender.xml   |   2 +-
 .../perf5AsyncApndMpscQWithLoc-noOpAppender.xml |   2 +-
 pom.xml                                         |   2 +-
 8 files changed, 263 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java
 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java
index ccda263..9ae140d 100644
--- 
a/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java
+++ 
b/log4j-core/src/main/java/org/apache/logging/log4j/core/async/JCToolsBlockingQueueFactory.java
@@ -7,6 +7,7 @@ import java.util.concurrent.locks.LockSupport;
 
 import org.apache.logging.log4j.core.config.Node;
 import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
 import org.jctools.queues.MpscArrayQueue;
 
@@ -18,17 +19,21 @@ import org.jctools.queues.MpscArrayQueue;
 @Plugin(name = "JCToolsBlockingQueue", category = Node.CATEGORY, elementType = 
BlockingQueueFactory.ELEMENT_TYPE)
 public class JCToolsBlockingQueueFactory<E> implements BlockingQueueFactory<E> 
{
 
-    private JCToolsBlockingQueueFactory() {
+    private final WaitStrategy waitStrategy;
+
+    private JCToolsBlockingQueueFactory(final WaitStrategy waitStrategy) {
+        this.waitStrategy = waitStrategy;
     }
 
     @Override
     public BlockingQueue<E> create(final int capacity) {
-        return new MpscBlockingQueue<>(capacity);
+        return new MpscBlockingQueue<>(capacity, waitStrategy);
     }
 
     @PluginFactory
-    public static <E> JCToolsBlockingQueueFactory<E> createFactory() {
-        return new JCToolsBlockingQueueFactory<>();
+    public static <E> JCToolsBlockingQueueFactory<E> createFactory(
+        @PluginAttribute(value = "WaitStrategy", defaultString = "PARK") final 
WaitStrategy waitStrategy) {
+        return new JCToolsBlockingQueueFactory<>(waitStrategy);
     }
 
     /**
@@ -36,8 +41,11 @@ public class JCToolsBlockingQueueFactory<E> implements 
BlockingQueueFactory<E> {
      */
     private static final class MpscBlockingQueue<E> extends MpscArrayQueue<E> 
implements BlockingQueue<E> {
 
-        MpscBlockingQueue(final int capacity) {
+        private final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy;
+
+        MpscBlockingQueue(final int capacity, final 
JCToolsBlockingQueueFactory.WaitStrategy waitStrategy) {
             super(capacity);
+            this.waitStrategy = waitStrategy;
         }
 
         @Override
@@ -49,29 +57,59 @@ public class JCToolsBlockingQueueFactory<E> implements 
BlockingQueueFactory<E> {
         public int drainTo(final Collection<? super E> c, final int 
maxElements) {
             return drain(new Consumer<E>() {
                 @Override
-                public void accept(E arg0) {
-                    c.add(arg0);
+                public void accept(E e) {
+                    c.add(e);
                 }
             }, maxElements);
         }
 
         @Override
         public boolean offer(final E e, final long timeout, final TimeUnit 
unit) throws InterruptedException {
-            // TODO Auto-generated method stub
-            return offer(e);
+            int idleCounter = 0;
+            final long timeoutNanos = System.nanoTime() + 
unit.toNanos(timeout);
+            do {
+                if (offer(e)) {
+                    return true;
+                } else if (System.nanoTime() - timeoutNanos > 0) {
+                    return false;
+                }
+                idleCounter = waitStrategy.idle(idleCounter);
+            } while (!Thread.interrupted()); //clear interrupted flag
+            throw new InterruptedException();
         }
 
         @Override
         public E poll(final long timeout, final TimeUnit unit) throws 
InterruptedException {
-            // TODO Auto-generated method stub
-            return poll();
+            int idleCounter = 0;
+            final long timeoutNanos = System.nanoTime() + 
unit.toNanos(timeout);
+            do {
+                E result = poll();
+                if (result != null) {
+                    return result;
+                } else if (System.nanoTime() - timeoutNanos > 0) {
+                    return null;
+                }
+                idleCounter = waitStrategy.idle(idleCounter);
+            } while (!Thread.interrupted()); //clear interrupted flag
+            throw new InterruptedException();
         }
 
         @Override
         public void put(final E e) throws InterruptedException {
-            while (!relaxedOffer(e)) {
-                LockSupport.parkNanos(1L);
-            }
+            int idleCounter = 0;
+            do {
+                if (offer(e)) {
+                    return;
+                }
+                idleCounter = waitStrategy.idle(idleCounter);
+            } while (!Thread.interrupted()); //clear interrupted flag
+            throw new InterruptedException();
+        }
+
+        @Override
+        public boolean offer(final E e) {
+            //keep 2 cache lines empty to avoid false sharing that will slow 
the consumer thread when queue is full.
+            return offerIfBelowThreshold(e, capacity() - 32);
         }
 
         @Override
@@ -81,13 +119,64 @@ public class JCToolsBlockingQueueFactory<E> implements 
BlockingQueueFactory<E> {
 
         @Override
         public E take() throws InterruptedException {
-            for (; ; ) {
-                final E result = poll();
+            int idleCounter = 100;
+            do {
+                final E result = relaxedPoll();
                 if (result != null) {
                     return result;
                 }
+                idleCounter = waitStrategy.idle(idleCounter);
+            } while (!Thread.interrupted()); //clear interrupted flag
+            throw new InterruptedException();
+        }
+    }
+
+    public enum WaitStrategy {
+        SPIN(new Idle() {
+            @Override
+            public int idle(final int idleCounter) {
+                return idleCounter + 1;
+            }
+        }),
+        YIELD(new Idle() {
+            @Override
+            public int idle(final int idleCounter) {
+                Thread.yield();
+                return idleCounter + 1;
+            }
+        }),
+        PARK(new Idle() {
+            @Override
+            public int idle(final int idleCounter) {
                 LockSupport.parkNanos(1L);
+                return idleCounter + 1;
             }
+        }),
+        PROGRESSIVE(new Idle() {
+            @Override
+            public int idle(final int idleCounter) {
+                if (idleCounter > 200) {
+                    LockSupport.parkNanos(1L);
+                } else if (idleCounter > 100) {
+                    Thread.yield();
+                }
+                return idleCounter + 1;
+            }
+        });
+
+        private final Idle idle;
+
+        private int idle(final int idleCounter) {
+            return idle.idle(idleCounter);
+        }
+
+        WaitStrategy(final Idle idle) {
+            this.idle = idle;
         }
     }
+
+    private interface Idle {
+        int idle(int idleCounter);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/AbstractRunQueue.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/AbstractRunQueue.java
 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/AbstractRunQueue.java
new file mode 100644
index 0000000..15ba61b
--- /dev/null
+++ 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/AbstractRunQueue.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.async.perftest;
+
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+
+import 
org.apache.logging.log4j.core.async.perftest.ResponseTimeTest.PrintingAsyncQueueFullPolicy;
+
+import com.lmax.disruptor.collections.Histogram;
+
+public abstract class AbstractRunQueue implements IPerfTestRunner {
+
+    abstract BlockingQueue<String> createQueue(int capacity);
+
+    private static final String STOP = "STOP_TEST";
+    private volatile boolean stopped = false;
+    private final BlockingQueue<String> queue = createQueue(256 * 1024);
+    private final Thread backGroundThread;
+
+    AbstractRunQueue() {
+        backGroundThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                for (; ; ) {
+                    try {
+                        if (Objects.equals(queue.take(), STOP)) {
+                            break;
+                        }
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                        break;
+                    }
+                }
+            }
+        });
+        backGroundThread.start();
+    }
+
+    @Override
+    public void runThroughputTest(final int lines, final Histogram histogram) {
+    }
+
+
+    @Override
+    public void runLatencyTest(final int samples, final Histogram histogram,
+                               final long nanoTimeCost, final int threadCount) 
{
+    }
+
+
+    @Override
+    public final void shutdown() {
+        stopped = true;
+        try {
+            queue.put(STOP);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+
+    @Override
+    public final void log(final String finalMessage) {
+        if (stopped) {
+            return;
+        }
+        if (!queue.offer(finalMessage)) {
+            PrintingAsyncQueueFullPolicy.ringbufferFull.incrementAndGet();
+            try {
+                queue.put(finalMessage);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java
 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java
index c09c428..93f0f2e 100644
--- 
a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java
+++ 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/ResponseTimeTest.java
@@ -156,6 +156,7 @@ public class ResponseTimeTest {
         final int COUNT = (1000 * 1000) / threadCount;
         runLatencyTest(logger, TEST_DURATION_MILLIS, COUNT, 
loadMessagesPerSec, idleStrategy, serviceTmHistograms,
                 responseTmHistograms, threadCount);
+        logger.shutdown();
         final long end = System.currentTimeMillis();
 
         // ... and report the results

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunConversant.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunConversant.java
 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunConversant.java
new file mode 100644
index 0000000..9b2adff
--- /dev/null
+++ 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunConversant.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.async.perftest;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.logging.log4j.core.async.DisruptorBlockingQueueFactory;
+
+import com.conversantmedia.util.concurrent.SpinPolicy;
+
+public class RunConversant extends AbstractRunQueue {
+
+    @Override
+    BlockingQueue<String> createQueue(int capacity) {
+        return 
DisruptorBlockingQueueFactory.<String>createFactory(SpinPolicy.SPINNING).create(capacity);
+    }
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunJCTools.java
----------------------------------------------------------------------
diff --git 
a/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunJCTools.java
 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunJCTools.java
new file mode 100644
index 0000000..a5a632c
--- /dev/null
+++ 
b/log4j-core/src/test/java/org/apache/logging/log4j/core/async/perftest/RunJCTools.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.async.perftest;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.logging.log4j.core.async.JCToolsBlockingQueueFactory;
+import 
org.apache.logging.log4j.core.async.JCToolsBlockingQueueFactory.WaitStrategy;
+
+public class RunJCTools extends AbstractRunQueue {
+
+    @Override
+    BlockingQueue<String> createQueue(int capacity) {
+        return 
JCToolsBlockingQueueFactory.<String>createFactory(WaitStrategy.SPIN).create(capacity);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml
----------------------------------------------------------------------
diff --git 
a/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml 
b/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml
index 26cd946..372c8bd 100644
--- a/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml
+++ b/log4j-perf/src/main/resources/perf5AsyncApndMpscQNoLoc-noOpAppender.xml
@@ -21,7 +21,7 @@
     </CountingNoOp>
     <Async name="Async" blocking="true" bufferSize="262144">
       <appender-ref ref="NoOp"/>
-      <JCToolsBlockingQueue/>
+      <JCToolsBlockingQueue waitStrategy="SPIN"/>
     </Async>
   </Appenders>
   <Loggers>

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml
----------------------------------------------------------------------
diff --git 
a/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml 
b/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml
index 3bf3c98..c2f6de0 100644
--- a/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml
+++ b/log4j-perf/src/main/resources/perf5AsyncApndMpscQWithLoc-noOpAppender.xml
@@ -21,7 +21,7 @@
     </CountingNoOp>
     <Async name="Async"  blocking="true" bufferSize="262144" 
includeLocation="true">
       <appender-ref ref="NoOp"/>
-      <JCToolsBlockingQueue/>
+      <JCToolsBlockingQueue waitStrategy="SPIN"/>
     </Async>
   </Appenders>
   <Loggers>

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/67d78505/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ec6bf85..b95be66 100644
--- a/pom.xml
+++ b/pom.xml
@@ -225,7 +225,7 @@
     <activemq.version>5.13.2</activemq.version>
     <!-- Allow Clirr severity to be overriden by the command-line option 
-DminSeverity=level -->
     <minSeverity>info</minSeverity>
-    <jctoolsVersion>1.2</jctoolsVersion>
+    <jctoolsVersion>1.2.1</jctoolsVersion>
   </properties>
   <pluginRepositories>
     <pluginRepository>

Reply via email to