This is an automated email from the ASF dual-hosted git repository.

vy pushed a commit to branch api-queue
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git

commit 8954e0770a7824e6199d515dcb63db11c040dcd3
Author: Volkan Yazıcı <[email protected]>
AuthorDate: Tue Dec 12 11:48:15 2023 +0100

    Remove the need for an SPSC queue
---
 .../logging/log4j/internal/ArrayQueueTest.java     | 101 +++++++++++++++++++++
 .../apache/logging/log4j/internal/ArrayQueue.java  | 100 ++++++++++++++++++++
 .../logging/log4j/internal/QueueFactories.java     |   6 --
 .../log4j/internal/ThreadLocalRecyclerFactory.java |  11 ++-
 4 files changed, 207 insertions(+), 11 deletions(-)

diff --git 
a/log4j-api-test/src/test/java/org/apache/logging/log4j/internal/ArrayQueueTest.java
 
b/log4j-api-test/src/test/java/org/apache/logging/log4j/internal/ArrayQueueTest.java
new file mode 100644
index 0000000000..e063e7f81c
--- /dev/null
+++ 
b/log4j-api-test/src/test/java/org/apache/logging/log4j/internal/ArrayQueueTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.logging.log4j.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class ArrayQueueTest {
+
+    @ParameterizedTest
+    @ValueSource(ints = {-1, 0})
+    void invalid_capacity_should_not_be_allowed(final int invalidCapacity) {
+        assertThatThrownBy(() -> new ArrayQueue<>(invalidCapacity))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("invalid capacity: " + invalidCapacity);
+    }
+
+    @Test
+    void should_work_with_capacity_1() {
+
+        // Verify initials
+        final Queue<String> queue = new ArrayQueue<>(1);
+        assertThat(queue.size()).isEqualTo(0);
+        assertThat(queue.peek()).isNull();
+        assertThat(queue.poll()).isNull();
+        assertThat(queue).isEmpty();
+
+        // Verify enqueue & deque
+        assertThat(queue.offer("foo")).isTrue();
+        assertThat(queue.offer("bar")).isFalse();
+        assertThat(queue.size()).isEqualTo(1);
+        assertThat(queue).containsOnly("foo");
+        assertThat(queue.peek()).isEqualTo("foo");
+        assertThat(queue.poll()).isEqualTo("foo");
+
+        // Verify final state
+        assertThat(queue.size()).isEqualTo(0);
+        assertThat(queue.peek()).isNull();
+        assertThat(queue.poll()).isNull();
+        assertThat(queue).isEmpty();
+    }
+
+    @ParameterizedTest
+    @CsvSource({
+        "1,0.3", "1,0.5", "1,0.8", "2,0.3", "2,0.5", "2,0.8", "3,0.3", 
"3,0.5", "3,0.8", "4,0.3", "4,0.5", "4,0.8"
+    })
+    void ops_should_match_with_std_lib(final int capacity, final double 
pollRatio) {
+
+        // Set the stage
+        final Random random = new Random(0);
+        final int opCount = random.nextInt(100);
+        final Queue<String> queueRef = new ArrayBlockingQueue<>(capacity);
+        final Queue<String> queueTarget = new ArrayQueue<>(capacity);
+
+        for (int opIndex = 0; opIndex < opCount; opIndex++) {
+
+            // Verify entry
+            assertThat(queueTarget.size()).isEqualTo(queueRef.size());
+            assertThat(queueTarget.peek()).isEqualTo(queueRef.peek());
+            assertThat(queueTarget).containsExactlyElementsOf(queueRef);
+
+            // Is this a `poll()`?
+            if (pollRatio >= random.nextDouble()) {
+                assertThat(queueTarget.poll()).isEqualTo(queueRef.poll());
+            }
+
+            // Then this is an `offer()`
+            else {
+                final String item = "op@" + opIndex;
+                
assertThat(queueTarget.offer(item)).isEqualTo(queueRef.offer(item));
+            }
+
+            // Verify exit
+            assertThat(queueTarget.size()).isEqualTo(queueRef.size());
+            assertThat(queueTarget.peek()).isEqualTo(queueRef.peek());
+            assertThat(queueTarget).containsExactlyElementsOf(queueRef);
+        }
+    }
+}
diff --git 
a/log4j-api/src/main/java/org/apache/logging/log4j/internal/ArrayQueue.java 
b/log4j-api/src/main/java/org/apache/logging/log4j/internal/ArrayQueue.java
new file mode 100644
index 0000000000..238efa8796
--- /dev/null
+++ b/log4j-api/src/main/java/org/apache/logging/log4j/internal/ArrayQueue.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.logging.log4j.internal;
+
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.stream.IntStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.logging.log4j.util.InternalApi;
+
+/**
+ * An array-backed, fixed-length, not-thread-safe {@link java.util.Queue} 
implementation.
+ *
+ * @param <E> the element type
+ */
+@InternalApi
+@NotThreadSafe
+final class ArrayQueue<E> extends AbstractQueue<E> {
+
+    private final E[] buffer;
+
+    private int head;
+
+    private int tail;
+
+    private int size;
+
+    @SuppressWarnings("unchecked")
+    ArrayQueue(final int capacity) {
+        if (capacity < 1) {
+            throw new IllegalArgumentException("invalid capacity: " + 
capacity);
+        }
+        buffer = (E[]) new Object[capacity];
+        head = 0;
+        tail = -1;
+        size = 0;
+    }
+
+    @Override
+    public Iterator<E> iterator() {
+        int[] i = {head};
+        return IntStream.range(0, size)
+                .mapToObj(ignored -> {
+                    final E item = buffer[i[0]];
+                    i[0] = (i[0] + 1) % buffer.length;
+                    return item;
+                })
+                .iterator();
+    }
+
+    @Override
+    public boolean offer(final E item) {
+        if (size == buffer.length) {
+            return false;
+        }
+        tail = (tail + 1) % buffer.length;
+        buffer[tail] = item;
+        size++;
+        return true;
+    }
+
+    @Override
+    public E poll() {
+        if (isEmpty()) {
+            return null;
+        }
+        final E item = buffer[head];
+        buffer[head] = null; // Clear refs for GC
+        head = (head + 1) % buffer.length;
+        size--;
+        return item;
+    }
+
+    @Override
+    public E peek() {
+        if (isEmpty()) {
+            return null;
+        }
+        return buffer[head];
+    }
+
+    @Override
+    public int size() {
+        return size;
+    }
+}
diff --git 
a/log4j-api/src/main/java/org/apache/logging/log4j/internal/QueueFactories.java 
b/log4j-api/src/main/java/org/apache/logging/log4j/internal/QueueFactories.java
index 3db9e5d6e9..3d0313057b 100644
--- 
a/log4j-api/src/main/java/org/apache/logging/log4j/internal/QueueFactories.java
+++ 
b/log4j-api/src/main/java/org/apache/logging/log4j/internal/QueueFactories.java
@@ -26,7 +26,6 @@ import org.apache.logging.log4j.util.Cast;
 import org.apache.logging.log4j.util.InternalApi;
 import org.apache.logging.log4j.util.LoaderUtil;
 import org.jctools.queues.MpmcArrayQueue;
-import org.jctools.queues.SpscArrayQueue;
 
 /**
  * Provides {@link QueueFactory} instances for different use cases.
@@ -40,11 +39,6 @@ import org.jctools.queues.SpscArrayQueue;
 @InternalApi
 public enum QueueFactories implements QueueFactory {
 
-    /**
-     * Provides a bounded queue for single-producer/single-consumer usage.
-     */
-    SPSC(() -> SpscArrayQueue::new),
-
     /**
      * Provides a bounded queue for multi-producer/multi-consumer usage.
      */
diff --git 
a/log4j-api/src/main/java/org/apache/logging/log4j/internal/ThreadLocalRecyclerFactory.java
 
b/log4j-api/src/main/java/org/apache/logging/log4j/internal/ThreadLocalRecyclerFactory.java
index 56e8930547..a05e9f3166 100644
--- 
a/log4j-api/src/main/java/org/apache/logging/log4j/internal/ThreadLocalRecyclerFactory.java
+++ 
b/log4j-api/src/main/java/org/apache/logging/log4j/internal/ThreadLocalRecyclerFactory.java
@@ -70,17 +70,17 @@ final class ThreadLocalRecyclerFactory implements 
RecyclerFactory {
 
         private final Consumer<V> cleaner;
 
-        private final ThreadLocal<Queue<V>> holder;
+        private final ThreadLocal<Queue<V>> queueRef;
 
         private ThreadLocalRecycler(final Supplier<V> supplier, final 
Consumer<V> cleaner, final int capacity) {
             super(supplier);
-            this.holder = ThreadLocal.withInitial(() -> 
QueueFactories.SPSC.create(capacity));
+            this.queueRef = ThreadLocal.withInitial(() -> new 
ArrayQueue<>(capacity));
             this.cleaner = cleaner;
         }
 
         @Override
         public V acquire() {
-            final Queue<V> queue = holder.get();
+            final Queue<V> queue = queueRef.get();
             final V value = queue.poll();
             return value != null ? value : createInstance();
         }
@@ -89,12 +89,13 @@ final class ThreadLocalRecyclerFactory implements 
RecyclerFactory {
         public void release(final V value) {
             requireNonNull(value, "value");
             cleaner.accept(value);
-            holder.get().offer(value);
+            final Queue<V> queue = queueRef.get();
+            queue.offer(value);
         }
 
         // Visible for testing
         Queue<V> getQueue() {
-            return holder.get();
+            return queueRef.get();
         }
     }
 }

Reply via email to