This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 2739eb6 RATIS-1434. Add iterator() and remove(..) methods to
DataQueue and DataBlockingQueue (#533)
2739eb6 is described below
commit 2739eb6883458395aef1f901e3e4b6825f02d658
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Nov 15 13:26:45 2021 +0800
RATIS-1434. Add iterator() and remove(..) methods to DataQueue and
DataBlockingQueue (#533)
---
.../org/apache/ratis/util/DataBlockingQueue.java | 11 +++++
.../main/java/org/apache/ratis/util/DataQueue.java | 35 +++++++++++++---
.../server/metrics/SegmentedRaftLogMetrics.java | 5 +--
.../java/org/apache/ratis/util/TestDataQueue.java | 47 ++++++++++++++++++++++
4 files changed, 89 insertions(+), 9 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
index d6c14ab..842b8f1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
@@ -109,6 +109,17 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
}
@Override
+ public boolean remove(E e) {
+ try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+ final boolean removed = super.remove(e);
+ if (removed) {
+ notFull.signal();
+ }
+ return removed;
+ }
+ }
+
+ @Override
public E poll() {
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
final E polled = super.poll();
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
index c4419d4..3db06f5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
@@ -22,9 +22,10 @@ import org.apache.ratis.util.function.TriConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -41,7 +42,7 @@ import java.util.function.ToLongFunction;
*
* This class is NOT threadsafe.
*/
-public class DataQueue<E> {
+public class DataQueue<E> implements Iterable<E> {
public static final Logger LOG = LoggerFactory.getLogger(DataQueue.class);
private final Object name;
@@ -59,7 +60,7 @@ public class DataQueue<E> {
this.byteLimit = byteLimit.getSize();
this.elementLimit = elementLimit;
this.getNumBytes = getNumBytes;
- this.q = new ArrayDeque<>(elementLimit);
+ this.q = new LinkedList<>();
}
public int getElementLimit() {
@@ -78,10 +79,12 @@ public class DataQueue<E> {
return q.size();
}
+ /** The same as {@link java.util.Collection#isEmpty()}. */
public final boolean isEmpty() {
return getNumElements() == 0;
}
+ /** The same as {@link java.util.Collection#clear()}. */
public void clear() {
q.clear();
numBytes = 0;
@@ -151,7 +154,29 @@ public class DataQueue<E> {
return polled;
}
- public int size(){
- return q.size();
+ /** The same as {@link java.util.Collection#remove(Object)}. */
+ public boolean remove(E e) {
+ final boolean removed = q.remove(e);
+ if (removed) {
+ numBytes -= getNumBytes.applyAsLong(e);
+ }
+ return removed;
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ final Iterator<E> i = q.iterator();
+ // Do not support the remove() method.
+ return new Iterator<E>() {
+ @Override
+ public boolean hasNext() {
+ return i.hasNext();
+ }
+
+ @Override
+ public E next() {
+ return i.next();
+ }
+ };
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
index 1282403..e513d94 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
@@ -81,10 +81,7 @@ public class SegmentedRaftLogMetrics extends
RaftLogMetricsBase {
}
public void addDataQueueSizeGauge(DataQueue queue) {
- registry.gauge(RAFT_LOG_DATA_QUEUE_SIZE, () -> () -> {
- //q.size() is O(1) operation
- return queue.size();
- });
+ registry.gauge(RAFT_LOG_DATA_QUEUE_SIZE, () -> queue::getNumElements);
}
public void addClosedSegmentsNum(SegmentedRaftLogCache cache) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
index 846517e..e629b16 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
@@ -21,6 +21,9 @@ import org.apache.ratis.util.function.TriConsumer;
import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;
@@ -134,6 +137,50 @@ public class TestDataQueue {
}
@Test(timeout = 1000)
+ public void testIteratorAndRemove() {
+ runTestIteratorAndRemove(q);
+ }
+
+ static void runTestIteratorAndRemove(DataQueue<Long> q) {
+ assertSizes(0, 0, q);
+
+ final int elementLimit = q.getElementLimit();
+ int numElements = 0;
+ int numBytes = 0;
+ for(long i = 0; i < elementLimit; i++) {
+ final boolean offered = q.offer(i);
+ Assert.assertTrue(offered);
+ numElements++;
+ numBytes += i;
+ assertSizes(numElements, numBytes, q);
+ }
+
+ { // test iterator()
+ final Iterator<Long> i = q.iterator();
+ for (long expected = 0; expected < elementLimit; expected++) {
+ Assert.assertEquals(expected, i.next().longValue());
+ }
+ }
+
+ { // test remove(..)
+ final List<Long> toRemoves = new ArrayList<>(elementLimit);
+ for (long i = 0; i < elementLimit; i++) {
+ toRemoves.add(i);
+ }
+ Collections.shuffle(toRemoves);
+
+ for (Long r : toRemoves) {
+ q.remove(r);
+ numElements--;
+ numBytes -= r;
+ assertSizes(numElements, numBytes, q);
+ }
+ }
+
+ assertSizes(0, 0, q);
+ }
+
+ @Test(timeout = 1000)
public void testTimeout() {
assertSizes(0, 0, q);