This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2739eb6  RATIS-1434. Add iterator() and remove(..) methods to 
DataQueue and DataBlockingQueue (#533)
2739eb6 is described below

commit 2739eb6883458395aef1f901e3e4b6825f02d658
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Nov 15 13:26:45 2021 +0800

    RATIS-1434. Add iterator() and remove(..) methods to DataQueue and 
DataBlockingQueue (#533)
---
 .../org/apache/ratis/util/DataBlockingQueue.java   | 11 +++++
 .../main/java/org/apache/ratis/util/DataQueue.java | 35 +++++++++++++---
 .../server/metrics/SegmentedRaftLogMetrics.java    |  5 +--
 .../java/org/apache/ratis/util/TestDataQueue.java  | 47 ++++++++++++++++++++++
 4 files changed, 89 insertions(+), 9 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java 
b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
index d6c14ab..842b8f1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
@@ -109,6 +109,17 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
   }
 
   @Override
+  public boolean remove(E e) {
+    try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+      final boolean removed = super.remove(e);
+      if (removed) {
+        notFull.signal();
+      }
+      return removed;
+    }
+  }
+
+  @Override
   public E poll() {
     try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
       final E polled = super.poll();
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java 
b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
index c4419d4..3db06f5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
@@ -22,9 +22,10 @@ import org.apache.ratis.util.function.TriConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -41,7 +42,7 @@ import java.util.function.ToLongFunction;
  *
  * This class is NOT threadsafe.
  */
-public class DataQueue<E> {
+public class DataQueue<E> implements Iterable<E> {
   public static final Logger LOG = LoggerFactory.getLogger(DataQueue.class);
 
   private final Object name;
@@ -59,7 +60,7 @@ public class DataQueue<E> {
     this.byteLimit = byteLimit.getSize();
     this.elementLimit = elementLimit;
     this.getNumBytes = getNumBytes;
-    this.q = new ArrayDeque<>(elementLimit);
+    this.q = new LinkedList<>();
   }
 
   public int getElementLimit() {
@@ -78,10 +79,12 @@ public class DataQueue<E> {
     return q.size();
   }
 
+  /** The same as {@link java.util.Collection#isEmpty()}. */
   public final boolean isEmpty() {
     return getNumElements() == 0;
   }
 
+  /** The same as {@link java.util.Collection#clear()}. */
   public void clear() {
     q.clear();
     numBytes = 0;
@@ -151,7 +154,29 @@ public class DataQueue<E> {
     return polled;
   }
 
-  public int size(){
-    return q.size();
+  /** The same as {@link java.util.Collection#remove(Object)}. */
+  public boolean remove(E e) {
+    final boolean removed = q.remove(e);
+    if (removed) {
+      numBytes -= getNumBytes.applyAsLong(e);
+    }
+    return removed;
+  }
+
+  @Override
+  public Iterator<E> iterator() {
+    final Iterator<E> i = q.iterator();
+    // Do not support the remove() method.
+    return new Iterator<E>() {
+      @Override
+      public boolean hasNext() {
+        return i.hasNext();
+      }
+
+      @Override
+      public E next() {
+        return i.next();
+      }
+    };
   }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
index 1282403..e513d94 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
@@ -81,10 +81,7 @@ public class SegmentedRaftLogMetrics extends 
RaftLogMetricsBase {
   }
 
   public void addDataQueueSizeGauge(DataQueue queue) {
-    registry.gauge(RAFT_LOG_DATA_QUEUE_SIZE, () -> () -> {
-      //q.size() is O(1) operation
-      return queue.size();
-    });
+    registry.gauge(RAFT_LOG_DATA_QUEUE_SIZE, () -> queue::getNumElements);
   }
 
   public void addClosedSegmentsNum(SegmentedRaftLogCache cache) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java 
b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
index 846517e..e629b16 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
@@ -21,6 +21,9 @@ import org.apache.ratis.util.function.TriConsumer;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
@@ -134,6 +137,50 @@ public class TestDataQueue {
   }
 
   @Test(timeout = 1000)
+  public void testIteratorAndRemove() {
+    runTestIteratorAndRemove(q);
+  }
+
+  static void runTestIteratorAndRemove(DataQueue<Long> q) {
+    assertSizes(0, 0, q);
+
+    final int elementLimit = q.getElementLimit();
+    int numElements = 0;
+    int numBytes = 0;
+    for(long i = 0; i < elementLimit; i++) {
+      final boolean offered = q.offer(i);
+      Assert.assertTrue(offered);
+      numElements++;
+      numBytes += i;
+      assertSizes(numElements, numBytes, q);
+    }
+
+    { // test iterator()
+      final Iterator<Long> i = q.iterator();
+      for (long expected = 0; expected < elementLimit; expected++) {
+        Assert.assertEquals(expected, i.next().longValue());
+      }
+    }
+
+    { // test remove(..)
+      final List<Long> toRemoves = new ArrayList<>(elementLimit);
+      for (long i = 0; i < elementLimit; i++) {
+        toRemoves.add(i);
+      }
+      Collections.shuffle(toRemoves);
+
+      for (Long r : toRemoves) {
+        q.remove(r);
+        numElements--;
+        numBytes -= r;
+        assertSizes(numElements, numBytes, q);
+      }
+    }
+
+    assertSizes(0, 0, q);
+  }
+
+  @Test(timeout = 1000)
   public void testTimeout() {
     assertSizes(0, 0, q);
 

Reply via email to