This is an automated email from the ASF dual-hosted git repository. vy pushed a commit to branch api-queue in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git
commit 8954e0770a7824e6199d515dcb63db11c040dcd3 Author: Volkan Yazıcı <[email protected]> AuthorDate: Tue Dec 12 11:48:15 2023 +0100 Remove the need for an SPSC queue --- .../logging/log4j/internal/ArrayQueueTest.java | 101 +++++++++++++++++++++ .../apache/logging/log4j/internal/ArrayQueue.java | 100 ++++++++++++++++++++ .../logging/log4j/internal/QueueFactories.java | 6 -- .../log4j/internal/ThreadLocalRecyclerFactory.java | 11 ++- 4 files changed, 207 insertions(+), 11 deletions(-) diff --git a/log4j-api-test/src/test/java/org/apache/logging/log4j/internal/ArrayQueueTest.java b/log4j-api-test/src/test/java/org/apache/logging/log4j/internal/ArrayQueueTest.java new file mode 100644 index 0000000000..e063e7f81c --- /dev/null +++ b/log4j-api-test/src/test/java/org/apache/logging/log4j/internal/ArrayQueueTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.ArrayBlockingQueue; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class ArrayQueueTest { + + @ParameterizedTest + @ValueSource(ints = {-1, 0}) + void invalid_capacity_should_not_be_allowed(final int invalidCapacity) { + assertThatThrownBy(() -> new ArrayQueue<>(invalidCapacity)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("invalid capacity: " + invalidCapacity); + } + + @Test + void should_work_with_capacity_1() { + + // Verify initials + final Queue<String> queue = new ArrayQueue<>(1); + assertThat(queue.size()).isEqualTo(0); + assertThat(queue.peek()).isNull(); + assertThat(queue.poll()).isNull(); + assertThat(queue).isEmpty(); + + // Verify enqueue & deque + assertThat(queue.offer("foo")).isTrue(); + assertThat(queue.offer("bar")).isFalse(); + assertThat(queue.size()).isEqualTo(1); + assertThat(queue).containsOnly("foo"); + assertThat(queue.peek()).isEqualTo("foo"); + assertThat(queue.poll()).isEqualTo("foo"); + + // Verify final state + assertThat(queue.size()).isEqualTo(0); + assertThat(queue.peek()).isNull(); + assertThat(queue.poll()).isNull(); + assertThat(queue).isEmpty(); + } + + @ParameterizedTest + @CsvSource({ + "1,0.3", "1,0.5", "1,0.8", "2,0.3", "2,0.5", "2,0.8", "3,0.3", "3,0.5", "3,0.8", "4,0.3", "4,0.5", "4,0.8" + }) + void ops_should_match_with_std_lib(final int capacity, final double pollRatio) { + + // Set the stage + final Random random = new Random(0); + final int opCount = random.nextInt(100); + final Queue<String> queueRef = new ArrayBlockingQueue<>(capacity); + final Queue<String> queueTarget = new ArrayQueue<>(capacity); + + for (int opIndex = 0; opIndex < opCount; opIndex++) { + + // Verify entry + assertThat(queueTarget.size()).isEqualTo(queueRef.size()); + assertThat(queueTarget.peek()).isEqualTo(queueRef.peek()); + assertThat(queueTarget).containsExactlyElementsOf(queueRef); + + // Is this a `poll()`? + if (pollRatio >= random.nextDouble()) { + assertThat(queueTarget.poll()).isEqualTo(queueRef.poll()); + } + + // Then this is an `offer()` + else { + final String item = "op@" + opIndex; + assertThat(queueTarget.offer(item)).isEqualTo(queueRef.offer(item)); + } + + // Verify exit + assertThat(queueTarget.size()).isEqualTo(queueRef.size()); + assertThat(queueTarget.peek()).isEqualTo(queueRef.peek()); + assertThat(queueTarget).containsExactlyElementsOf(queueRef); + } + } +} diff --git a/log4j-api/src/main/java/org/apache/logging/log4j/internal/ArrayQueue.java b/log4j-api/src/main/java/org/apache/logging/log4j/internal/ArrayQueue.java new file mode 100644 index 0000000000..238efa8796 --- /dev/null +++ b/log4j-api/src/main/java/org/apache/logging/log4j/internal/ArrayQueue.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.internal; + +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.stream.IntStream; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.logging.log4j.util.InternalApi; + +/** + * An array-backed, fixed-length, not-thread-safe {@link java.util.Queue} implementation. + * + * @param <E> the element type + */ +@InternalApi +@NotThreadSafe +final class ArrayQueue<E> extends AbstractQueue<E> { + + private final E[] buffer; + + private int head; + + private int tail; + + private int size; + + @SuppressWarnings("unchecked") + ArrayQueue(final int capacity) { + if (capacity < 1) { + throw new IllegalArgumentException("invalid capacity: " + capacity); + } + buffer = (E[]) new Object[capacity]; + head = 0; + tail = -1; + size = 0; + } + + @Override + public Iterator<E> iterator() { + int[] i = {head}; + return IntStream.range(0, size) + .mapToObj(ignored -> { + final E item = buffer[i[0]]; + i[0] = (i[0] + 1) % buffer.length; + return item; + }) + .iterator(); + } + + @Override + public boolean offer(final E item) { + if (size == buffer.length) { + return false; + } + tail = (tail + 1) % buffer.length; + buffer[tail] = item; + size++; + return true; + } + + @Override + public E poll() { + if (isEmpty()) { + return null; + } + final E item = buffer[head]; + buffer[head] = null; // Clear refs for GC + head = (head + 1) % buffer.length; + size--; + return item; + } + + @Override + public E peek() { + if (isEmpty()) { + return null; + } + return buffer[head]; + } + + @Override + public int size() { + return size; + } +} diff --git a/log4j-api/src/main/java/org/apache/logging/log4j/internal/QueueFactories.java b/log4j-api/src/main/java/org/apache/logging/log4j/internal/QueueFactories.java index 3db9e5d6e9..3d0313057b 100644 --- a/log4j-api/src/main/java/org/apache/logging/log4j/internal/QueueFactories.java +++ b/log4j-api/src/main/java/org/apache/logging/log4j/internal/QueueFactories.java @@ -26,7 +26,6 @@ import org.apache.logging.log4j.util.Cast; import org.apache.logging.log4j.util.InternalApi; import org.apache.logging.log4j.util.LoaderUtil; import org.jctools.queues.MpmcArrayQueue; -import org.jctools.queues.SpscArrayQueue; /** * Provides {@link QueueFactory} instances for different use cases. @@ -40,11 +39,6 @@ import org.jctools.queues.SpscArrayQueue; @InternalApi public enum QueueFactories implements QueueFactory { - /** - * Provides a bounded queue for single-producer/single-consumer usage. - */ - SPSC(() -> SpscArrayQueue::new), - /** * Provides a bounded queue for multi-producer/multi-consumer usage. */ diff --git a/log4j-api/src/main/java/org/apache/logging/log4j/internal/ThreadLocalRecyclerFactory.java b/log4j-api/src/main/java/org/apache/logging/log4j/internal/ThreadLocalRecyclerFactory.java index 56e8930547..a05e9f3166 100644 --- a/log4j-api/src/main/java/org/apache/logging/log4j/internal/ThreadLocalRecyclerFactory.java +++ b/log4j-api/src/main/java/org/apache/logging/log4j/internal/ThreadLocalRecyclerFactory.java @@ -70,17 +70,17 @@ final class ThreadLocalRecyclerFactory implements RecyclerFactory { private final Consumer<V> cleaner; - private final ThreadLocal<Queue<V>> holder; + private final ThreadLocal<Queue<V>> queueRef; private ThreadLocalRecycler(final Supplier<V> supplier, final Consumer<V> cleaner, final int capacity) { super(supplier); - this.holder = ThreadLocal.withInitial(() -> QueueFactories.SPSC.create(capacity)); + this.queueRef = ThreadLocal.withInitial(() -> new ArrayQueue<>(capacity)); this.cleaner = cleaner; } @Override public V acquire() { - final Queue<V> queue = holder.get(); + final Queue<V> queue = queueRef.get(); final V value = queue.poll(); return value != null ? value : createInstance(); } @@ -89,12 +89,13 @@ final class ThreadLocalRecyclerFactory implements RecyclerFactory { public void release(final V value) { requireNonNull(value, "value"); cleaner.accept(value); - holder.get().offer(value); + final Queue<V> queue = queueRef.get(); + queue.offer(value); } // Visible for testing Queue<V> getQueue() { - return holder.get(); + return queueRef.get(); } } }
