This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1e4238f6e32 Pipe: Deleted the reflection code of AtomicUpdater (#16663)
1e4238f6e32 is described below
commit 1e4238f6e32fce45706aa10aa179e7f29d1bf69d
Author: Caideyipi <[email protected]>
AuthorDate: Tue Oct 28 10:23:07 2025 +0800
Pipe: Deleted the reflection code of AtomicUpdater (#16663)
---
.../realtime/disruptor/MultiProducerSequencer.java | 19 ++++++++++---
.../realtime/disruptor/SequenceGroups.java | 32 +++++++---------------
2 files changed, 25 insertions(+), 26 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
index d40ed968398..9aa7716a7a5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
/**
@@ -53,7 +55,7 @@ public final class MultiProducerSequencer {
* modified by SequenceGroups Array reference is replaced atomically via
* AtomicReferenceFieldUpdater
*/
- volatile Sequence[] gatingSequences;
+ volatile AtomicReference<Sequence[]> gatingSequences = new
AtomicReference<>();
/**
* Cached minimum gating sequence to reduce contention Updated
opportunistically in next() to
@@ -90,7 +92,7 @@ public final class MultiProducerSequencer {
}
this.bufferSize = bufferSize;
- this.gatingSequences = gatingSequences != null ? gatingSequences : new
Sequence[0];
+ this.gatingSequences.set(Objects.nonNull(gatingSequences) ?
gatingSequences : new Sequence[0]);
this.availableBuffer = new AtomicIntegerArray(bufferSize);
this.indexMask = bufferSize - 1;
this.indexShift = log2(bufferSize);
@@ -123,7 +125,7 @@ public final class MultiProducerSequencer {
final long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
- long gatingSequence = Sequence.getMinimumSequence(gatingSequences,
current);
+ long gatingSequence =
Sequence.getMinimumSequence(gatingSequences.get(), current);
if (wrapPoint > gatingSequence) {
LockSupport.parkNanos(1);
@@ -180,7 +182,7 @@ public final class MultiProducerSequencer {
}
public long remainingCapacity() {
- long consumed = Sequence.getMinimumSequence(gatingSequences, cursor.get());
+ long consumed = Sequence.getMinimumSequence(gatingSequences.get(),
cursor.get());
long produced = cursor.get();
return bufferSize - (produced - consumed);
}
@@ -243,6 +245,15 @@ public final class MultiProducerSequencer {
return ((int) sequence) & indexMask;
}
+ public Sequence[] getGatingSequences() {
+ return gatingSequences.get();
+ }
+
+ public boolean compareAndSetGatingSequences(
+ final Sequence[] currentSequences, final Sequence[] updatedSequences) {
+ return gatingSequences.compareAndSet(currentSequences, updatedSequences);
+ }
+
/**
* Calculate log2 for index shift calculation
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
index af5039070f0..dd22ee4ccdd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
/**
* Utility for atomic management of sequence arrays
*
@@ -32,12 +30,6 @@ import
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
*/
final class SequenceGroups {
- /** Field updater for atomic array replacement */
- private static final AtomicReferenceFieldUpdater<MultiProducerSequencer,
Sequence[]>
- SEQUENCE_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(
- MultiProducerSequencer.class, Sequence[].class,
"gatingSequences");
-
/**
* Atomically add sequences to the gating sequence array
*
@@ -51,27 +43,23 @@ final class SequenceGroups {
final MultiProducerSequencer sequencer,
final Sequence cursor,
final Sequence... sequencesToAdd) {
- long cursorSequence;
- Sequence[] updatedSequences;
Sequence[] currentSequences;
-
+ Sequence[] updatedSequences;
+ long cursorSequence;
do {
- currentSequences = sequencer.gatingSequences;
+ currentSequences = sequencer.getGatingSequences();
updatedSequences = new Sequence[currentSequences.length +
sequencesToAdd.length];
System.arraycopy(currentSequences, 0, updatedSequences, 0,
currentSequences.length);
-
cursorSequence = cursor.get();
-
- int index = currentSequences.length;
- for (Sequence sequence : sequencesToAdd) {
- sequence.set(cursorSequence);
- updatedSequences[index++] = sequence;
+ int idx = currentSequences.length;
+ for (Sequence seq : sequencesToAdd) {
+ seq.set(cursorSequence);
+ updatedSequences[idx++] = seq;
}
- } while (!SEQUENCE_UPDATER.compareAndSet(sequencer, currentSequences,
updatedSequences));
-
+ } while (!sequencer.compareAndSetGatingSequences(currentSequences,
updatedSequences));
cursorSequence = cursor.get();
- for (Sequence sequence : sequencesToAdd) {
- sequence.set(cursorSequence);
+ for (Sequence seq : sequencesToAdd) {
+ seq.set(cursorSequence);
}
}
}