This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch matcher-opti-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/matcher-opti-13 by this push:
new 5b06a05261a regionid
5b06a05261a is described below
commit 5b06a05261aeb6b5d195c142d9a88aece69e2c35
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 18 16:49:26 2026 +0800
regionid
---
.../db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java | 6 +++++-
.../source/dataregion/realtime/assigner/PipeDataRegionAssigner.java | 6 ++++--
2 files changed, 9 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index f80e02f2d48..2dda8ca397f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -50,11 +50,14 @@ public class DisruptorQueue {
private final RingBuffer<EventContainer> ringBuffer;
private volatile boolean isClosed = false;
+ private final int dataRegionId;
private volatile long lastLogTime = Long.MIN_VALUE;
public DisruptorQueue(
+ final int dataRegionId,
final EventHandler<PipeRealtimeEvent> eventHandler,
final Consumer<PipeRealtimeEvent> onAssignedHook) {
+ this.dataRegionId = dataRegionId;
final PipeConfig config = PipeConfig.getInstance();
final int ringBufferSize =
config.getPipeSourceAssignerDisruptorRingBufferSize();
final long ringBufferEntrySizeInBytes =
@@ -113,7 +116,8 @@ public class DisruptorQueue {
&& System.currentTimeMillis() - lastLogTime
>=
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds()) {
LOGGER.warn(
- "The assigner queue content has exceeded half, it may be stuck and
may block insertion. capacity: {}, bufferSize: {}",
+ "The assigner queue content has exceeded half, it may be stuck and
may block insertion. regionId: {}, capacity: {}, bufferSize: {}",
+ dataRegionId,
capacity,
bufferSize);
lastLogTime = System.currentTimeMillis();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 6a4f877bf47..e3067a13126 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -64,7 +64,9 @@ public class PipeDataRegionAssigner implements Closeable {
public PipeDataRegionAssigner(final String dataRegionId) {
this.matcher = new CachedSchemaPatternMatcher();
- this.disruptor = new DisruptorQueue(this::assignToExtractor,
this::onAssignedHook);
+ this.disruptor =
+ new DisruptorQueue(
+ Integer.parseInt(dataRegionId), this::assignToSource,
this::onAssignedHook);
this.dataRegionId = dataRegionId;
PipeAssignerMetrics.getInstance().register(this);
}
@@ -105,7 +107,7 @@ public class PipeDataRegionAssigner implements Closeable {
eventCounter.decreaseEventCount(innerEvent);
}
- private void assignToExtractor(
+ private void assignToSource(
final PipeRealtimeEvent event, final long sequence, final boolean
endOfBatch) {
if (disruptor.isClosed()) {
return;