This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch steve-link in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c1f477f28091766b6df2885b5a0e3ce4fb30245d Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Dec 1 21:32:06 2023 +0800 Update ConcurrentIterableLinkedQueue.java --- .../ConcurrentIterableLinkedQueue.java | 105 ++++++++++----------- 1 file changed, 50 insertions(+), 55 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java index 993dd4e63f2..dbaf827274e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java @@ -35,19 +35,17 @@ public class ConcurrentIterableLinkedQueue<E> { private E data; private LinkedListNode<E> next; - public LinkedListNode(E data) { + private LinkedListNode(E data) { this.data = data; this.next = null; } } private final LinkedListNode<E> dummyNode = new LinkedListNode<>(null); - private LinkedListNode<E> firstNode; - private LinkedListNode<E> lastNode; - - // The indexes of elements are (firstIndex, firstIndex + 1, ...., lastIndex - 1) - private volatile long firstIndex = 0; - private volatile long lastIndex = 0; + private volatile LinkedListNode<E> firstNode; + private volatile LinkedListNode<E> lastNode; + private volatile long firstIndex = -1; + private volatile long lastIndex = -1; private final ReentrantLock lock = new ReentrantLock(); private final Condition hasNextCondition = lock.newCondition(); @@ -60,7 +58,7 @@ public class ConcurrentIterableLinkedQueue<E> { public boolean isEmpty() { lock.lock(); try { - return firstNode == dummyNode; + return firstIndex == lastIndex; } finally { lock.unlock(); } @@ -72,18 +70,18 @@ public class ConcurrentIterableLinkedQueue<E> { } final LinkedListNode<E> newNode = new LinkedListNode<>(e); - lock.lock(); try { + ++lastIndex; + if (firstNode == dummyNode) { + firstIndex = lastIndex; firstNode = newNode; } lastNode.next = newNode; lastNode = newNode; - ++lastIndex; - hasNextCondition.signalAll(); } finally { lock.unlock(); @@ -91,18 +89,20 @@ public class ConcurrentIterableLinkedQueue<E> { } public void removeBefore(long newFirstIndex) { + if (newFirstIndex <= firstIndex) { + throw new IllegalArgumentException("New first index must be greater than the current first index."); + } + lock.lock(); try { newFirstIndex = Math.min(newFirstIndex, lastIndex); if (newFirstIndex <= firstIndex) { return; } - // assert firstIndex < newFirstIndex LinkedListNode<E> currentNode = firstNode; - for (long i = firstIndex; i < newFirstIndex; ++i) { - final LinkedListNode<E> nextNode = currentNode.next; + LinkedListNode<E> nextNode = currentNode.next; currentNode.data = null; currentNode.next = null; currentNode = nextNode; @@ -145,59 +145,51 @@ public class ConcurrentIterableLinkedQueue<E> { } public Iterator iterateFromEarliest() { - lock.lock(); - try { - return iterateFrom(firstIndex); - } finally { - lock.unlock(); - } + return new Iterator(); } public Iterator iterateFromLatest() { - lock.lock(); - try { - return iterateFrom(lastIndex); - } finally { - lock.unlock(); - } + return iterateFrom(lastIndex); } public Iterator iterateFrom(long index) { - lock.lock(); - try { - return new Iterator(index); - } finally { - lock.unlock(); - } + return new Iterator(index); } - /** NOTE: not thread-safe. */ public class Iterator implements java.util.Iterator<E> { private LinkedListNode<E> currentNode; private long currentIndex; + private Iterator() { + lock.lock(); + try { + currentNode = dummyNode; + currentIndex = firstIndex - 1; + } finally { + lock.unlock(); + } + } + private Iterator(long index) { - seek(index); + lock.lock(); + try { + seek(index); + } finally { + lock.unlock(); + } } - /** - * Seek the {@link Iterator#currentIndex} to the closest position allowed to the given index. - * Note that one can seek to {@link ConcurrentIterableLinkedQueue#lastIndex} to subscribe the - * next incoming element. - * - * @param index the attempt index - * @return the actual new index - */ public long seek(long index) { lock.lock(); try { currentNode = firstNode; currentIndex = firstIndex; - final long targetIndex = Math.max(firstIndex, Math.min(index, lastIndex)); - for (long i = 0; i < targetIndex - firstIndex; ++i) { - moveToNext(); + index = Math.max(firstIndex, Math.min(index, lastIndex)); + while (currentIndex < index && currentNode.next != null) { + currentNode = currentNode.next; + currentIndex++; } return currentIndex; @@ -206,18 +198,11 @@ public class ConcurrentIterableLinkedQueue<E> { } } - private void moveToNext() { - if (currentNode.next != null) { - currentNode = currentNode.next; - ++currentIndex; - } - } - @Override public boolean hasNext() { lock.lock(); try { - return currentNode.next != null; + return currentNode != null && currentNode.next != null; } finally { lock.unlock(); } @@ -231,13 +216,14 @@ public class ConcurrentIterableLinkedQueue<E> { public E next(long waitTimeMillis) { lock.lock(); try { - while (!hasNext()) { + while (currentNode.next == null) { if (!hasNextCondition.await(waitTimeMillis, TimeUnit.MILLISECONDS)) { return null; } } - moveToNext(); + currentNode = currentNode.next; + currentIndex++; return currentNode.data; } catch (InterruptedException e) { @@ -248,6 +234,15 @@ public class ConcurrentIterableLinkedQueue<E> { } } + public E getCurrent() { + lock.lock(); + try { + return currentNode.data; + } finally { + lock.unlock(); + } + } + public long getCurrentIndex() { lock.lock(); try {
