This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch matcher-opti-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/matcher-opti-13 by this push:
     new 5b06a05261a regionid
5b06a05261a is described below

commit 5b06a05261aeb6b5d195c142d9a88aece69e2c35
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 18 16:49:26 2026 +0800

    regionid
---
 .../db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java | 6 +++++-
 .../source/dataregion/realtime/assigner/PipeDataRegionAssigner.java | 6 ++++--
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index f80e02f2d48..2dda8ca397f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -50,11 +50,14 @@ public class DisruptorQueue {
   private final RingBuffer<EventContainer> ringBuffer;
   private volatile boolean isClosed = false;
 
+  private final int dataRegionId;
   private volatile long lastLogTime = Long.MIN_VALUE;
 
   public DisruptorQueue(
+      final int dataRegionId,
       final EventHandler<PipeRealtimeEvent> eventHandler,
       final Consumer<PipeRealtimeEvent> onAssignedHook) {
+    this.dataRegionId = dataRegionId;
     final PipeConfig config = PipeConfig.getInstance();
     final int ringBufferSize = 
config.getPipeSourceAssignerDisruptorRingBufferSize();
     final long ringBufferEntrySizeInBytes =
@@ -113,7 +116,8 @@ public class DisruptorQueue {
         && System.currentTimeMillis() - lastLogTime
             >= 
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds()) {
       LOGGER.warn(
-          "The assigner queue content has exceeded half, it may be stuck and 
may block insertion. capacity: {}, bufferSize: {}",
+          "The assigner queue content has exceeded half, it may be stuck and 
may block insertion. regionId: {}, capacity: {}, bufferSize: {}",
+          dataRegionId,
           capacity,
           bufferSize);
       lastLogTime = System.currentTimeMillis();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 6a4f877bf47..e3067a13126 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -64,7 +64,9 @@ public class PipeDataRegionAssigner implements Closeable {
 
   public PipeDataRegionAssigner(final String dataRegionId) {
     this.matcher = new CachedSchemaPatternMatcher();
-    this.disruptor = new DisruptorQueue(this::assignToExtractor, 
this::onAssignedHook);
+    this.disruptor =
+        new DisruptorQueue(
+            Integer.parseInt(dataRegionId), this::assignToSource, 
this::onAssignedHook);
     this.dataRegionId = dataRegionId;
     PipeAssignerMetrics.getInstance().register(this);
   }
@@ -105,7 +107,7 @@ public class PipeDataRegionAssigner implements Closeable {
     eventCounter.decreaseEventCount(innerEvent);
   }
 
-  private void assignToExtractor(
+  private void assignToSource(
       final PipeRealtimeEvent event, final long sequence, final boolean 
endOfBatch) {
     if (disruptor.isClosed()) {
       return;

Reply via email to