This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
commit 20ef704a6def7119629a45d033a3605a57c42ac7 Author: Timothy Bish <[email protected]> AuthorDate: Fri May 2 11:49:46 2025 -0400 PROTON-2887 Remove some unused code from DeliveryQueue Remove some older unused code in the message passing delivery queue and clean up a bit. --- .../qpid/protonj2/client/util/DeliveryQueue.java | 19 --- .../protonj2/client/util/FifoDeliveryQueue.java | 153 +++++++-------------- 2 files changed, 52 insertions(+), 120 deletions(-) diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/DeliveryQueue.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/DeliveryQueue.java index 70dcb9d5..b5a551ca 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/DeliveryQueue.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/DeliveryQueue.java @@ -33,14 +33,6 @@ public interface DeliveryQueue { */ void enqueue(ClientDelivery delivery); - /** - * Adds the given {@link Delivery} to the front of the queue. - * - * @param delivery - * The in-bound Delivery to enqueue. - */ - void enqueueFirst(ClientDelivery delivery); - /** * Used to get an {@link Delivery}. The amount of time this method blocks is based on the timeout value * that is supplied to it. @@ -89,22 +81,11 @@ public interface DeliveryQueue { */ void stop(); - /** - * Closes the Delivery Queue. No Delivery can be added or removed from the Queue - * once it has entered the closed state. - */ - void close(); - /** * @return true if the Queue is not in the stopped or closed state. */ boolean isRunning(); - /** - * @return true if the Queue has been closed. - */ - boolean isClosed(); - /** * @return true if there are no deliveries in the queue. */ diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java index 2bbb8f40..a68ad9a7 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java @@ -18,7 +18,6 @@ package org.apache.qpid.protonj2.client.util; import java.util.ArrayDeque; import java.util.Deque; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.qpid.protonj2.client.Delivery; import org.apache.qpid.protonj2.client.impl.ClientDelivery; @@ -28,19 +27,11 @@ import org.apache.qpid.protonj2.client.impl.ClientDelivery; */ public final class FifoDeliveryQueue implements DeliveryQueue { - private static final AtomicIntegerFieldUpdater<FifoDeliveryQueue> STATE_FIELD_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(FifoDeliveryQueue.class, "state"); - - private static final int CLOSED = 0; - private static final int STOPPED = 1; - private static final int RUNNING = 2; - - private volatile int state = STOPPED; + private final Deque<ClientDelivery> queue; + private volatile boolean started; private int waiters = 0; - private final Deque<ClientDelivery> queue; - /** * Creates a new first in / first out message queue with the given queue depth * @@ -52,136 +43,96 @@ public final class FifoDeliveryQueue implements DeliveryQueue { } @Override - public void enqueueFirst(ClientDelivery envelope) { - synchronized (queue) { - queue.addFirst(envelope); - if (waiters > 0) { - queue.notify(); - } - } - } - - @Override - public void enqueue(ClientDelivery envelope) { - synchronized (queue) { - queue.addLast(envelope); - if (waiters > 0) { - queue.notify(); - } + public synchronized void enqueue(ClientDelivery envelope) { + queue.addLast(envelope); + if (waiters > 0) { + notify(); } } @Override - public ClientDelivery dequeue(long timeout) throws InterruptedException { - synchronized (queue) { - // Wait until the receiver is ready to deliver messages. - while (timeout != 0 && isRunning() && queue.isEmpty()) { - if (timeout == -1) { - waiters++; - try { - queue.wait(); - } finally { - waiters--; - } - } else { - long start = System.currentTimeMillis(); - waiters++; - try { - queue.wait(timeout); - } finally { - waiters--; - } - timeout = Math.max(timeout + start - System.currentTimeMillis(), 0); + public synchronized ClientDelivery dequeue(long timeout) throws InterruptedException { + // Wait until the receiver is ready to deliver messages. + while (queue.isEmpty() && timeout != 0 && started) { + if (timeout == -1) { + waiters++; + try { + wait(); + } finally { + waiters--; } + } else { + long start = System.currentTimeMillis(); + waiters++; + try { + wait(timeout); + } finally { + waiters--; + } + timeout = Math.max(timeout + start - System.currentTimeMillis(), 0); } + } - if (!isRunning()) { - return null; - } - + if (started) { return queue.pollFirst(); + } else { + return null; } } @Override - public ClientDelivery dequeueNoWait() { - synchronized (queue) { - if (!isRunning()) { - return null; - } - + public synchronized ClientDelivery dequeueNoWait() { + if (started) { return queue.pollFirst(); + } else { + return null; } } @Override - public void start() { - if (STATE_FIELD_UPDATER.compareAndSet(this, STOPPED, RUNNING)) { - synchronized (queue) { - if (waiters > 0) { - queue.notifyAll(); - } - } - } - } + public synchronized void start() { + if (!started) { + started = true; - @Override - public void stop() { - if (STATE_FIELD_UPDATER.compareAndSet(this, RUNNING, STOPPED)) { - synchronized (queue) { - if (waiters > 0) { - queue.notifyAll(); - } + if (waiters > 0) { + notifyAll(); } } } @Override - public void close() { - if (STATE_FIELD_UPDATER.getAndSet(this, CLOSED) > CLOSED) { - synchronized (queue) { - if (waiters > 0) { - queue.notifyAll(); - } + public synchronized void stop() { + if (started) { + started = false; + + if (waiters > 0) { + notifyAll(); } } } @Override public boolean isRunning() { - return state == RUNNING; + return started; } @Override - public boolean isClosed() { - return state == CLOSED; + public synchronized boolean isEmpty() { + return queue.isEmpty(); } @Override - public boolean isEmpty() { - synchronized (queue) { - return queue.isEmpty(); - } + public synchronized int size() { + return queue.size(); } @Override - public int size() { - synchronized (queue) { - return queue.size(); - } + public synchronized void clear() { + queue.clear(); } @Override - public void clear() { - synchronized (queue) { - queue.clear(); - } - } - - @Override - public String toString() { - synchronized (queue) { - return queue.toString(); - } + public synchronized String toString() { + return queue.toString(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
