This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e4238f6e32 Pipe: Deleted the reflection code of AtomicUpdater (#16663)
1e4238f6e32 is described below

commit 1e4238f6e32fce45706aa10aa179e7f29d1bf69d
Author: Caideyipi <[email protected]>
AuthorDate: Tue Oct 28 10:23:07 2025 +0800

    Pipe: Deleted the reflection code of AtomicUpdater (#16663)
---
 .../realtime/disruptor/MultiProducerSequencer.java | 19 ++++++++++---
 .../realtime/disruptor/SequenceGroups.java         | 32 +++++++---------------
 2 files changed, 25 insertions(+), 26 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
index d40ed968398..9aa7716a7a5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
 
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.LockSupport;
 
 /**
@@ -53,7 +55,7 @@ public final class MultiProducerSequencer {
    * modified by SequenceGroups Array reference is replaced atomically via
    * AtomicReferenceFieldUpdater
    */
-  volatile Sequence[] gatingSequences;
+  volatile AtomicReference<Sequence[]> gatingSequences = new 
AtomicReference<>();
 
   /**
    * Cached minimum gating sequence to reduce contention Updated 
opportunistically in next() to
@@ -90,7 +92,7 @@ public final class MultiProducerSequencer {
     }
 
     this.bufferSize = bufferSize;
-    this.gatingSequences = gatingSequences != null ? gatingSequences : new 
Sequence[0];
+    this.gatingSequences.set(Objects.nonNull(gatingSequences) ? 
gatingSequences : new Sequence[0]);
     this.availableBuffer = new AtomicIntegerArray(bufferSize);
     this.indexMask = bufferSize - 1;
     this.indexShift = log2(bufferSize);
@@ -123,7 +125,7 @@ public final class MultiProducerSequencer {
       final long cachedGatingSequence = gatingSequenceCache.get();
 
       if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
-        long gatingSequence = Sequence.getMinimumSequence(gatingSequences, 
current);
+        long gatingSequence = 
Sequence.getMinimumSequence(gatingSequences.get(), current);
 
         if (wrapPoint > gatingSequence) {
           LockSupport.parkNanos(1);
@@ -180,7 +182,7 @@ public final class MultiProducerSequencer {
   }
 
   public long remainingCapacity() {
-    long consumed = Sequence.getMinimumSequence(gatingSequences, cursor.get());
+    long consumed = Sequence.getMinimumSequence(gatingSequences.get(), 
cursor.get());
     long produced = cursor.get();
     return bufferSize - (produced - consumed);
   }
@@ -243,6 +245,15 @@ public final class MultiProducerSequencer {
     return ((int) sequence) & indexMask;
   }
 
+  public Sequence[] getGatingSequences() {
+    return gatingSequences.get();
+  }
+
+  public boolean compareAndSetGatingSequences(
+      final Sequence[] currentSequences, final Sequence[] updatedSequences) {
+    return gatingSequences.compareAndSet(currentSequences, updatedSequences);
+  }
+
   /**
    * Calculate log2 for index shift calculation
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
index af5039070f0..dd22ee4ccdd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
 
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
 /**
  * Utility for atomic management of sequence arrays
  *
@@ -32,12 +30,6 @@ import 
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
  */
 final class SequenceGroups {
 
-  /** Field updater for atomic array replacement */
-  private static final AtomicReferenceFieldUpdater<MultiProducerSequencer, 
Sequence[]>
-      SEQUENCE_UPDATER =
-          AtomicReferenceFieldUpdater.newUpdater(
-              MultiProducerSequencer.class, Sequence[].class, 
"gatingSequences");
-
   /**
    * Atomically add sequences to the gating sequence array
    *
@@ -51,27 +43,23 @@ final class SequenceGroups {
       final MultiProducerSequencer sequencer,
       final Sequence cursor,
       final Sequence... sequencesToAdd) {
-    long cursorSequence;
-    Sequence[] updatedSequences;
     Sequence[] currentSequences;
-
+    Sequence[] updatedSequences;
+    long cursorSequence;
     do {
-      currentSequences = sequencer.gatingSequences;
+      currentSequences = sequencer.getGatingSequences();
       updatedSequences = new Sequence[currentSequences.length + 
sequencesToAdd.length];
       System.arraycopy(currentSequences, 0, updatedSequences, 0, 
currentSequences.length);
-
       cursorSequence = cursor.get();
-
-      int index = currentSequences.length;
-      for (Sequence sequence : sequencesToAdd) {
-        sequence.set(cursorSequence);
-        updatedSequences[index++] = sequence;
+      int idx = currentSequences.length;
+      for (Sequence seq : sequencesToAdd) {
+        seq.set(cursorSequence);
+        updatedSequences[idx++] = seq;
       }
-    } while (!SEQUENCE_UPDATER.compareAndSet(sequencer, currentSequences, 
updatedSequences));
-
+    } while (!sequencer.compareAndSetGatingSequences(currentSequences, 
updatedSequences));
     cursorSequence = cursor.get();
-    for (Sequence sequence : sequencesToAdd) {
-      sequence.set(cursorSequence);
+    for (Sequence seq : sequencesToAdd) {
+      seq.set(cursorSequence);
     }
   }
 }

Reply via email to