This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new cd46594 ZOOKEEPER-3340: Introduce CircularBlockingQueue in
QuorumCnxManager.java
cd46594 is described below
commit cd465947949f64d4258184d7b493a7e8747c6262
Author: Beluga Behr <[email protected]>
AuthorDate: Wed Nov 13 20:41:47 2019 +0100
ZOOKEEPER-3340: Introduce CircularBlockingQueue in QuorumCnxManager.java
There are many caveats and comments regarding Exception thrown 'which is
safe to ignore'. Added a new data structure that removes all of these comments
and will perform without generating exceptions while more clearly implementing
the desired behavior without the caveats.
Author: Beluga Behr <[email protected]>
Author: David Mollitor <[email protected]>
Author: David Mollitor <[email protected]>
Reviewers: [email protected], [email protected]
Closes #880 from belugabehr/ZOOKEEPER-3340 and squashes the following
commits:
7b74dac27 [David Mollitor] Changed comment from conrete ArrayBlockingQueue
to BlockingQueue
38d7e3f05 [David Mollitor] Shutdown ExecutorService in test
f5ea2b6b1 [David Mollitor] Updated to fix checkstyle ImportOrder: Extra
separation
b3ac3cc44 [David Mollitor] Rebased to master
c63ab852c [Beluga Behr] Added updates from code review
d8877803b [Beluga Behr] Introduce new class CircularBlockingQueue
2793db2f1 [Beluga Behr] Added additional logging
ce96b7071 [Beluga Behr] ZOOKEEPER-3340: Improve Queue Usage in
QuorumCnxManager.java
---
.../zookeeper/server/quorum/QuorumCnxManager.java | 136 ++++------
.../zookeeper/util/CircularBlockingQueue.java | 277 +++++++++++++++++++++
.../zookeeper/util/TestCircularBlockingQueue.java | 76 ++++++
3 files changed, 399 insertions(+), 90 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
index 2327445..e37986d 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
@@ -36,9 +36,8 @@ import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@@ -55,6 +54,7 @@ import
org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ConfigUtils;
+import org.apache.zookeeper.util.CircularBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,17 +137,13 @@ public class QuorumCnxManager {
* Mapping from Peer to Thread number
*/
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
- final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
+ final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
/*
* Reception queue
*/
- public final ArrayBlockingQueue<Message> recvQueue;
- /*
- * Object to synchronize access to recvQueue
- */
- private final Object recvQLock = new Object();
+ public final BlockingQueue<Message> recvQueue;
/*
* Shutdown flag
@@ -253,10 +249,10 @@ public class QuorumCnxManager {
}
public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long,
QuorumPeer.QuorumServer> view, QuorumAuthServer authServer, QuorumAuthLearner
authLearner, int socketTimeout, boolean listenOnAllIPs, int
quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
- this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
- this.queueSendMap = new ConcurrentHashMap<Long,
ArrayBlockingQueue<ByteBuffer>>();
- this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
- this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
+ this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
+ this.queueSendMap = new ConcurrentHashMap<>();
+ this.senderWorkerMap = new ConcurrentHashMap<>();
+ this.lastMessageSent = new ConcurrentHashMap<>();
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if (cnxToValue != null) {
@@ -438,7 +434,8 @@ public class QuorumCnxManager {
}
senderWorkerMap.put(sid, sw);
- queueSendMap.putIfAbsent(sid, new
ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
+
+ queueSendMap.putIfAbsent(sid, new
CircularBlockingQueue<>(SEND_CAPACITY));
sw.start();
rw.start();
@@ -573,7 +570,7 @@ public class QuorumCnxManager {
senderWorkerMap.put(sid, sw);
- queueSendMap.putIfAbsent(sid, new
ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
+ queueSendMap.putIfAbsent(sid, new
CircularBlockingQueue<>(SEND_CAPACITY));
sw.start();
rw.start();
@@ -598,10 +595,9 @@ public class QuorumCnxManager {
/*
* Start a new connection if doesn't have one already.
*/
- ArrayBlockingQueue<ByteBuffer> bq =
queueSendMap.computeIfAbsent(sid, serverId -> new
ArrayBlockingQueue<>(SEND_CAPACITY));
+ BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid,
serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
addToSendQueue(bq, b);
connectOne(sid);
-
}
}
@@ -724,9 +720,10 @@ public class QuorumCnxManager {
* Check if all queues are empty, indicating that all messages have been
delivered.
*/
boolean haveDelivered() {
- for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
- LOG.debug("Queue size: {}", queue.size());
- if (queue.size() == 0) {
+ for (BlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
+ final int queueSize = queue.size();
+ LOG.debug("Queue size: {}", queueSize);
+ if (queueSize == 0) {
return true;
}
}
@@ -1085,7 +1082,7 @@ public class QuorumCnxManager {
* message than that stored in lastMessage. To avoid sending
* stale message, we should send the message in the send queue.
*/
- ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
+ BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
@@ -1103,7 +1100,7 @@ public class QuorumCnxManager {
ByteBuffer b = null;
try {
- ArrayBlockingQueue<ByteBuffer> bq =
queueSendMap.get(sid);
+ BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
@@ -1216,37 +1213,19 @@ public class QuorumCnxManager {
}
/**
- * Inserts an element in the specified queue. If the Queue is full, this
- * method removes an element from the head of the Queue and then inserts
- * the element at the tail. It can happen that an element is removed
- * by another thread in {@link SendWorker#run() }
- * method before this method attempts to remove an element from the queue.
- * This will cause {@link ArrayBlockingQueue#remove() remove} to throw an
- * exception, which is safe to ignore.
- *
- * Unlike {@link #addToRecvQueue(Message) addToRecvQueue} this method does
- * not need to be synchronized since there is only one thread that inserts
- * an element in the queue and another thread that reads from the queue.
+ * Inserts an element in the provided {@link BlockingQueue}. This method
+ * assumes that if the Queue is full, an element from the head of the
Queue is
+ * removed and the new item is inserted at the tail of the queue. This is
done
+ * to prevent a thread from blocking while inserting an element in the
queue.
*
- * @param queue
- * Reference to the Queue
- * @param buffer
- * Reference to the buffer to be inserted in the queue
+ * @param queue Reference to the Queue
+ * @param buffer Reference to the buffer to be inserted in the queue
*/
- private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
ByteBuffer buffer) {
- if (queue.remainingCapacity() == 0) {
- try {
- queue.remove();
- } catch (NoSuchElementException ne) {
- // element could be removed by poll()
- LOG.debug("Trying to remove from an empty Queue. Ignoring
exception.", ne);
- }
- }
- try {
- queue.add(buffer);
- } catch (IllegalStateException ie) {
- // This should never happen
- LOG.error("Unable to insert an element in the queue ", ie);
+ private void addToSendQueue(final BlockingQueue<ByteBuffer> queue,
+ final ByteBuffer buffer) {
+ final boolean success = queue.offer(buffer);
+ if (!success) {
+ throw new RuntimeException("Could not insert into receive queue");
}
}
@@ -1257,7 +1236,7 @@ public class QuorumCnxManager {
* @return
* true if the specified queue is empty
*/
- private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
+ private boolean isSendQueueEmpty(final BlockingQueue<ByteBuffer> queue) {
return queue.isEmpty();
}
@@ -1266,49 +1245,25 @@ public class QuorumCnxManager {
* waiting up to the specified wait time if necessary for an element to
* become available.
*
- * {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
+ * {@link BlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
*/
- private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
long timeout, TimeUnit unit) throws InterruptedException {
- return queue.poll(timeout, unit);
+ private ByteBuffer pollSendQueue(final BlockingQueue<ByteBuffer> queue,
+ final long timeout, final TimeUnit unit) throws InterruptedException
{
+ return queue.poll(timeout, unit);
}
/**
* Inserts an element in the {@link #recvQueue}. If the Queue is full, this
- * methods removes an element from the head of the Queue and then inserts
- * the element at the tail of the queue.
- *
- * This method is synchronized to achieve fairness between two threads that
- * are trying to insert an element in the queue. Each thread checks if the
- * queue is full, then removes the element at the head of the queue, and
- * then inserts an element at the tail. This three-step process is done to
- * prevent a thread from blocking while inserting an element in the queue.
- * If we do not synchronize the call to this method, then a thread can grab
- * a slot in the queue created by the second thread. This can cause the
call
- * to insert by the second thread to fail.
- * Note that synchronizing this method does not block another thread
- * from polling the queue since that synchronization is provided by the
- * queue itself.
+ * methods removes an element from the head of the Queue and then inserts
the
+ * element at the tail of the queue.
*
- * @param msg
- * Reference to the message to be inserted in the queue
+ * @param msg Reference to the message to be inserted in the queue
*/
- public void addToRecvQueue(Message msg) {
- synchronized (recvQLock) {
- if (recvQueue.remainingCapacity() == 0) {
- try {
- recvQueue.remove();
- } catch (NoSuchElementException ne) {
- // element could be removed by poll()
- LOG.debug("Trying to remove from an empty recvQueue.
Ignoring exception.", ne);
- }
- }
- try {
- recvQueue.add(msg);
- } catch (IllegalStateException ie) {
- // This should never happen
- LOG.error("Unable to insert element in the recvQueue ", ie);
- }
- }
+ public void addToRecvQueue(final Message msg) {
+ final boolean success = this.recvQueue.offer(msg);
+ if (!success) {
+ throw new RuntimeException("Could not insert into receive queue");
+ }
}
/**
@@ -1316,10 +1271,11 @@ public class QuorumCnxManager {
* waiting up to the specified wait time if necessary for an element to
* become available.
*
- * {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
+ * {@link BlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
*/
- public Message pollRecvQueue(long timeout, TimeUnit unit) throws
InterruptedException {
- return recvQueue.poll(timeout, unit);
+ public Message pollRecvQueue(final long timeout, final TimeUnit unit)
+ throws InterruptedException {
+ return this.recvQueue.poll(timeout, unit);
}
public boolean connectedToPeer(long peerSid) {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/util/CircularBlockingQueue.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/util/CircularBlockingQueue.java
new file mode 100644
index 0000000..cbacb65
--- /dev/null
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/util/CircularBlockingQueue.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.util;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A bounded blocking queue backed by an array. This queue orders elements FIFO
+ * (first-in-first-out). The head of the queue is that element that has been on
+ * the queue the longest time. The tail of the queue is that element that has
+ * been on the queue the shortest time. New elements are inserted at the tail
of
+ * the queue, and the queue retrieval operations obtain elements at the head of
+ * the queue. If the queue is full, the head of the queue (the oldest element)
+ * will be removed to make room for the newest element.
+ */
+public class CircularBlockingQueue<E> implements BlockingQueue<E> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CircularBlockingQueue.class);
+
+ /** Main lock guarding all access */
+ private final ReentrantLock lock;
+
+ /** Condition for waiting takes */
+ private final Condition notEmpty;
+
+ /** The array-backed queue */
+ private final ArrayDeque<E> queue;
+
+ private final int maxSize;
+
+ private long droppedCount;
+
+ public CircularBlockingQueue(int queueSize) {
+ this.queue = new ArrayDeque<>(queueSize);
+ this.maxSize = queueSize;
+
+ this.lock = new ReentrantLock();
+ this.notEmpty = this.lock.newCondition();
+ this.droppedCount = 0L;
+ }
+
+ /**
+ * This method differs from {@link BlockingQueue#offer(Object)} in that it
+ * will remove the oldest queued element (the element at the front of the
+ * queue) in order to make room for any new elements if the queue is full.
+ *
+ * @param e the element to add
+ * @return true since it will make room for any new elements if required
+ */
+ @Override
+ public boolean offer(E e) {
+ Objects.requireNonNull(e);
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ if (this.queue.size() == this.maxSize) {
+ final E discard = this.queue.remove();
+ this.droppedCount++;
+ LOG.debug("Queue is full. Discarding oldest element [count={}]:
{}",
+ this.droppedCount, discard);
+ }
+ this.queue.add(e);
+ this.notEmpty.signal();
+ } finally {
+ lock.unlock();
+ }
+ return true;
+ }
+
+ @Override
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ try {
+ while (this.queue.isEmpty()) {
+ if (nanos <= 0) {
+ return null;
+ }
+ nanos = this.notEmpty.awaitNanos(nanos);
+ }
+ return this.queue.poll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public E take() throws InterruptedException {
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ try {
+ while (this.queue.isEmpty()) {
+ this.notEmpty.await();
+ }
+ return this.queue.poll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return this.queue.isEmpty();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int size() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return this.queue.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the number of elements that were dropped from the queue because
the
+ * queue was full when a new element was offered.
+ *
+ * @return The number of elements dropped (lost) from the queue
+ */
+ public long getDroppedCount() {
+ return this.droppedCount;
+ }
+
+ /**
+ * For testing purposes only.
+ *
+ * @return True if a thread is blocked waiting for a new element to be
offered
+ * to the queue
+ */
+ boolean isConsumerThreadBlocked() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return lock.getWaitQueueLength(this.notEmpty) > 0;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int drainTo(Collection<? super E> c) {
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ public E poll() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public E element() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public E peek() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public E remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends E> arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object[] toArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean add(E e) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean offer(E e, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void put(E e) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int remainingCapacity() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/util/TestCircularBlockingQueue.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/util/TestCircularBlockingQueue.java
new file mode 100644
index 0000000..ac24d2e
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/util/TestCircularBlockingQueue.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.util;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCircularBlockingQueue {
+
+ @Test
+ public void testCircularBlockingQueue() throws InterruptedException {
+ final CircularBlockingQueue<Integer> testQueue =
+ new CircularBlockingQueue<>(2);
+
+ testQueue.offer(1);
+ testQueue.offer(2);
+ testQueue.offer(3);
+
+ Assert.assertEquals(2, testQueue.size());
+
+ Assert.assertEquals(2, testQueue.take().intValue());
+ Assert.assertEquals(3, testQueue.take().intValue());
+
+ Assert.assertEquals(1L, testQueue.getDroppedCount());
+ Assert.assertEquals(0, testQueue.size());
+ Assert.assertEquals(true, testQueue.isEmpty());
+ }
+
+ @Test(timeout = 10000L)
+ public void testCircularBlockingQueueTakeBlock()
+ throws InterruptedException, ExecutionException {
+
+ final CircularBlockingQueue<Integer> testQueue = new
CircularBlockingQueue<>(2);
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ Future<Integer> testTake = executor.submit(() -> {
+ return testQueue.take();
+ });
+
+ // Allow the other thread to get into position; waiting for item to be
+ // inserted
+ while (!testQueue.isConsumerThreadBlocked()) {
+ Thread.sleep(50L);
+ }
+
+ testQueue.offer(10);
+
+ Integer result = testTake.get();
+ Assert.assertEquals(10, result.intValue());
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+}