This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e2bd04d87c4 Pipe: Fix iterator.hasNext() == true after removal from 
empty ConcurrentIterableLinkedQueue (#12514)
e2bd04d87c4 is described below

commit e2bd04d87c45f9639f6efef5beb8c1ebc6942893
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 13 15:17:37 2024 +0800

    Pipe: Fix iterator.hasNext() == true after removal from empty 
ConcurrentIterableLinkedQueue (#12514)
---
 .../queue/ConcurrentIterableLinkedQueue.java       | 20 +++---
 .../ConcurrentIterableLinkedQueueTest.java         | 84 ++++++++++++----------
 2 files changed, 59 insertions(+), 45 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
index 7672ef73371..78a8c1a75fa 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
@@ -40,7 +40,7 @@ public class ConcurrentIterableLinkedQueue<E> {
     private E data;
     private LinkedListNode<E> next;
 
-    private LinkedListNode(E data) {
+    private LinkedListNode(final E data) {
       this.data = data;
       this.next = null;
     }
@@ -65,7 +65,7 @@ public class ConcurrentIterableLinkedQueue<E> {
    *
    * @param e the element to be added, which cannot be {@code null}
    */
-  public void add(E e) {
+  public void add(final E e) {
     if (e == null) {
       throw new IllegalArgumentException("Null element is not allowed.");
     }
@@ -125,7 +125,11 @@ public class ConcurrentIterableLinkedQueue<E> {
       }
 
       firstNode = currentNode;
-      pilotNode.next = firstNode;
+      // pilotNode.next shall be null when the queue is empty and firstNode == 
pilotNode
+      // to make iterator.hasNext() == false when the iterator is on the 
pilotNode
+      if (firstNode != pilotNode) {
+        pilotNode.next = firstNode;
+      }
 
       // Reset firstNode and lastNode to pilotNode if the queue becomes empty
       if (firstNode == null) {
@@ -200,7 +204,7 @@ public class ConcurrentIterableLinkedQueue<E> {
     }
   }
 
-  public void setFirstIndex(long firstIndex) {
+  public void setFirstIndex(final long firstIndex) {
     lock.writeLock().lock();
     try {
       this.firstIndex = firstIndex;
@@ -218,7 +222,7 @@ public class ConcurrentIterableLinkedQueue<E> {
    * If the queue is empty, the given index is valid if it is equal to {@link
    * ConcurrentIterableLinkedQueue#firstIndex}.
    */
-  public boolean isNextIndexValid(long nextIndex) {
+  public boolean isNextIndexValid(final long nextIndex) {
     lock.readLock().lock();
     try {
       return firstIndex <= nextIndex && nextIndex <= tailIndex;
@@ -231,7 +235,7 @@ public class ConcurrentIterableLinkedQueue<E> {
     return !iteratorSet.isEmpty();
   }
 
-  public DynamicIterator iterateFrom(long offset) {
+  public DynamicIterator iterateFrom(final long offset) {
     final DynamicIterator iterator = new DynamicIterator(offset);
     iteratorSet.put(iterator, iterator);
     return iterator;
@@ -289,7 +293,7 @@ public class ConcurrentIterableLinkedQueue<E> {
      * @return the next element in the queue. {@code null} if the queue is 
closed, or if the waiting
      *     time elapsed, or the thread is interrupted
      */
-    public E next(long waitTimeMillis) {
+    public E next(final long waitTimeMillis) {
       lock.writeLock().lock();
       try {
         while (!hasNext()) {
@@ -306,7 +310,7 @@ public class ConcurrentIterableLinkedQueue<E> {
         ++nextIndex;
 
         return currentNode.data;
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         Thread.currentThread().interrupt();
         LOGGER.warn("Interrupted while waiting for next element.", e);
         return null;
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java
index f04bc41a984..a7a7310b113 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java
@@ -40,7 +40,7 @@ import static org.junit.Assert.assertFalse;
 
 public class ConcurrentIterableLinkedQueueTest {
 
-  ConcurrentIterableLinkedQueue<Integer> queue;
+  private ConcurrentIterableLinkedQueue<Integer> queue;
 
   @Before
   public void setUp() {
@@ -60,7 +60,7 @@ public class ConcurrentIterableLinkedQueueTest {
     queue.add(2);
     queue.add(3);
 
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
     itr.seek(2);
     Assert.assertEquals(Integer.valueOf(3), itr.next());
   }
@@ -68,7 +68,7 @@ public class ConcurrentIterableLinkedQueueTest {
   @Test(timeout = 60000)
   public void testTimedGet() {
     queue.add(1);
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFromEarliest();
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFromEarliest();
     Assert.assertEquals(1, (int) itr.next(1000));
     Assert.assertNull(itr.next(1000));
   }
@@ -80,8 +80,8 @@ public class ConcurrentIterableLinkedQueueTest {
 
   @Test(timeout = 60000)
   public void testConcurrentAddAndRemove() throws InterruptedException {
-    int numberOfAdds = 500;
-    ExecutorService executor = Executors.newFixedThreadPool(2);
+    final int numberOfAdds = 500;
+    final ExecutorService executor = Executors.newFixedThreadPool(2);
 
     // Thread 1 adds elements to the queue
     executor.submit(
@@ -110,9 +110,9 @@ public class ConcurrentIterableLinkedQueueTest {
 
   @Test(timeout = 60000)
   public void testIterateFromEmptyQueue() {
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(1);
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(1);
 
-    AtomicInteger value = new AtomicInteger(-1);
+    final AtomicInteger value = new AtomicInteger(-1);
     new Thread(() -> value.set(itr.next())).start();
     queue.add(3);
     Awaitility.await().untilAsserted(() -> Assert.assertEquals(3, 
value.get()));
@@ -120,8 +120,8 @@ public class ConcurrentIterableLinkedQueueTest {
 
   @Test(timeout = 60000)
   public void testContinuousEmptyNext() throws InterruptedException {
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
-    AtomicInteger consumedValue = new AtomicInteger(0);
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
+    final AtomicInteger consumedValue = new AtomicInteger(0);
     new Thread(
             () -> {
               while (true) {
@@ -143,7 +143,7 @@ public class ConcurrentIterableLinkedQueueTest {
   public void testRemove() {
     queue.add(1);
     queue.add(2);
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(1);
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(1);
 
     Assert.assertEquals(1, queue.tryRemoveBefore(Long.MAX_VALUE));
     Assert.assertEquals(2, (int) itr.next());
@@ -153,7 +153,7 @@ public class ConcurrentIterableLinkedQueueTest {
   public void testRemoveAgainstNewestItr() {
     queue.add(1);
     queue.add(2);
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFromLatest();
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFromLatest();
 
     Assert.assertEquals(2, queue.tryRemoveBefore(Long.MAX_VALUE));
     queue.add(3);
@@ -164,7 +164,7 @@ public class ConcurrentIterableLinkedQueueTest {
   public void testClear() {
     queue.add(1);
     queue.add(2);
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(1);
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(1);
     queue.clear();
 
     assertFalse(queue.hasAnyIterators());
@@ -186,12 +186,12 @@ public class ConcurrentIterableLinkedQueueTest {
     Assert.assertEquals(1, queue.getFirstIndex());
     Assert.assertEquals(2, queue.getTailIndex());
 
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it = 
queue.iterateFromEarliest();
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it = 
queue.iterateFromEarliest();
     Assert.assertEquals(2, (int) it.next());
 
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it2 = 
queue.iterateFromLatest();
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it2 = 
queue.iterateFromLatest();
     Assert.assertEquals(2, it2.getNextIndex());
-    AtomicInteger value = new AtomicInteger(-1);
+    final AtomicInteger value = new AtomicInteger(-1);
     new Thread(() -> value.set(it2.next())).start();
     queue.add(3);
     Awaitility.await().untilAsserted(() -> Assert.assertEquals(3, 
value.get()));
@@ -206,10 +206,10 @@ public class ConcurrentIterableLinkedQueueTest {
 
   @Test(timeout = 60000)
   public void testConcurrentReadWrite() {
-    AtomicBoolean failure = new AtomicBoolean(false);
-    List<Thread> threadList = new ArrayList<>(102);
+    final AtomicBoolean failure = new AtomicBoolean(false);
+    final List<Thread> threadList = new ArrayList<>(102);
 
-    Thread thread1 =
+    final Thread thread1 =
         new Thread(
             () -> {
               try {
@@ -223,7 +223,7 @@ public class ConcurrentIterableLinkedQueueTest {
     threadList.add(thread1);
     thread1.start();
 
-    Thread thread2 =
+    final Thread thread2 =
         new Thread(
             () -> {
               try {
@@ -238,7 +238,7 @@ public class ConcurrentIterableLinkedQueueTest {
     thread2.start();
 
     for (int i = 0; i < 100; ++i) {
-      Thread thread =
+      final Thread thread =
           new Thread(
               () -> {
                 try {
@@ -282,13 +282,13 @@ public class ConcurrentIterableLinkedQueueTest {
   @Test(timeout = 60000)
   public void testBoundaryConditions() {
     queue.add(1);
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(10);
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(10);
     Assert.assertFalse(itr.hasNext());
   }
 
   @Test(timeout = 60000)
   public void testConcurrentExceptionHandling() throws InterruptedException {
-    ExecutorService executor = Executors.newFixedThreadPool(2);
+    final ExecutorService executor = Executors.newFixedThreadPool(2);
 
     executor.submit(
         () -> {
@@ -301,11 +301,11 @@ public class ConcurrentIterableLinkedQueueTest {
           queue.clear();
         });
 
-    AtomicBoolean caughtException = new AtomicBoolean(false);
+    final AtomicBoolean caughtException = new AtomicBoolean(false);
     executor.submit(
         () -> {
           try {
-            ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
+            final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
                 queue.iterateFromEarliest();
             while (itr.hasNext()) {
               itr.next();
@@ -327,7 +327,7 @@ public class ConcurrentIterableLinkedQueueTest {
       queue.add(i);
     }
 
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
     for (int i = 0; i < numberOfElements; i++) {
       Assert.assertTrue(itr.hasNext());
       Assert.assertEquals(Integer.valueOf(i), itr.next());
@@ -337,8 +337,8 @@ public class ConcurrentIterableLinkedQueueTest {
 
   @Test(timeout = 60000)
   public void testMultiThreadedConsistency() throws InterruptedException {
-    int numberOfElements = 1000;
-    ExecutorService executor = Executors.newFixedThreadPool(10);
+    final int numberOfElements = 1000;
+    final ExecutorService executor = Executors.newFixedThreadPool(10);
 
     for (int i = 0; i < numberOfElements; i++) {
       int finalI = i;
@@ -348,8 +348,8 @@ public class ConcurrentIterableLinkedQueueTest {
     executor.shutdown();
     Assert.assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));
 
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFromEarliest();
-    HashSet<Integer> elements = new HashSet<>();
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFromEarliest();
+    final HashSet<Integer> elements = new HashSet<>();
     while (itr.hasNext()) {
       elements.add(itr.next());
     }
@@ -363,7 +363,7 @@ public class ConcurrentIterableLinkedQueueTest {
       queue.add(i);
     }
 
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
     for (int i = 0; i < 5; i++) {
       Assert.assertTrue(itr.hasNext());
       Assert.assertEquals(Integer.valueOf(i), itr.next());
@@ -378,7 +378,7 @@ public class ConcurrentIterableLinkedQueueTest {
     }
 
     queue.tryRemoveBefore(3);
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
     for (int i = 3; i < 5; i++) {
       Assert.assertTrue(itr.hasNext());
       Assert.assertEquals(Integer.valueOf(i), itr.next());
@@ -392,8 +392,8 @@ public class ConcurrentIterableLinkedQueueTest {
       queue.add(i);
     }
 
-    ExecutorService executor = Executors.newFixedThreadPool(10);
-    AtomicInteger count = new AtomicInteger(0);
+    final ExecutorService executor = Executors.newFixedThreadPool(10);
+    final AtomicInteger count = new AtomicInteger(0);
 
     for (int i = 0; i < 10; i++) {
       executor.submit(
@@ -417,7 +417,7 @@ public class ConcurrentIterableLinkedQueueTest {
       queue.add(i);
     }
 
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
     for (int i = 0; i < 3; i++) {
       Assert.assertTrue(itr.hasNext());
       Assert.assertEquals(Integer.valueOf(i), itr.next());
@@ -447,10 +447,20 @@ public class ConcurrentIterableLinkedQueueTest {
     Assert.assertFalse(itr.hasNext());
   }
 
+  @Test(timeout = 60000)
+  public void testIterateAfterRemoveFromEmptyQueue() {
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
+    Assert.assertFalse(itr.hasNext());
+    queue.tryRemoveBefore(0);
+    Assert.assertFalse(itr.hasNext());
+    itr.next(10);
+    Assert.assertEquals(0, itr.getNextIndex());
+  }
+
   @Test(timeout = 60000)
   public void testIteratorExceptionHandling() {
     queue.add(1);
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
     queue.clear();
 
     Assert.assertFalse(itr.hasNext());
@@ -464,8 +474,8 @@ public class ConcurrentIterableLinkedQueueTest {
       queue.add(i);
     }
 
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
-    long newNextIndex = itr.seek(5);
+    final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
+    final long newNextIndex = itr.seek(5);
     Assert.assertEquals(5, newNextIndex);
     Assert.assertTrue(itr.hasNext());
     Assert.assertEquals(Integer.valueOf(5), itr.next());

Reply via email to