This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch pipe-flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 086f9793ff22f898fd206c2b0416b908db25b845
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 13 17:40:26 2026 +0800

    clean
---
 .../listener/PipeInsertionDataNodeListener.java    | 18 ++++++------
 .../db/storageengine/dataregion/DataRegion.java    |  8 ++++--
 .../dataregion/memtable/TsFileProcessor.java       |  8 +++---
 .../db/pipe/source/PipeRealtimeExtractTest.java    | 32 +++++++---------------
 4 files changed, 29 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 8fb95d78fc4..d255d80166a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -51,8 +51,8 @@ public class PipeInsertionDataNodeListener {
   private final ConcurrentMap<Integer, PipeDataRegionAssigner> 
dataRegionId2Assigner =
       new ConcurrentHashMap<>();
 
-  private final AtomicInteger listenToTsFileExtractorCount = new 
AtomicInteger(0);
-  private final AtomicInteger listenToInsertNodeExtractorCount = new 
AtomicInteger(0);
+  private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0);
+  private final AtomicInteger listenToInsertNodeSourceCount = new 
AtomicInteger(0);
 
   //////////////////////////// start & stop ////////////////////////////
 
@@ -63,10 +63,10 @@ public class PipeInsertionDataNodeListener {
         .startAssignTo(extractor);
 
     if (extractor.isNeedListenToTsFile()) {
-      listenToTsFileExtractorCount.incrementAndGet();
+      listenToTsFileSourceCount.incrementAndGet();
     }
     if (extractor.isNeedListenToInsertNode()) {
-      listenToInsertNodeExtractorCount.incrementAndGet();
+      listenToInsertNodeSourceCount.incrementAndGet();
     }
   }
 
@@ -80,10 +80,10 @@ public class PipeInsertionDataNodeListener {
     assigner.stopAssignTo(extractor);
 
     if (extractor.isNeedListenToTsFile()) {
-      listenToTsFileExtractorCount.decrementAndGet();
+      listenToTsFileSourceCount.decrementAndGet();
     }
     if (extractor.isNeedListenToInsertNode()) {
-      listenToInsertNodeExtractorCount.decrementAndGet();
+      listenToInsertNodeSourceCount.decrementAndGet();
     }
 
     if (assigner.notMoreExtractorNeededToBeAssigned()) {
@@ -97,7 +97,7 @@ public class PipeInsertionDataNodeListener {
   //////////////////////////// listen to events ////////////////////////////
 
   public void listenToTsFile(
-      final String dataRegionId,
+      final int dataRegionId,
       final String databaseName,
       final TsFileResource tsFileResource,
       final boolean isLoaded) {
@@ -118,11 +118,11 @@ public class PipeInsertionDataNodeListener {
   }
 
   public void listenToInsertNode(
-      final String dataRegionId,
+      final int dataRegionId,
       final String databaseName,
       final InsertNode insertNode,
       final TsFileResource tsFileResource) {
-    if (listenToInsertNodeExtractorCount.get() == 0) {
+    if (listenToInsertNodeSourceCount.get() == 0) {
       return;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 8a76f891679..7a0d5b86833 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -4157,7 +4157,7 @@ public class DataRegion implements IDataRegionForQuery {
 
     // Listen before the tsFile is added into tsFile manager to avoid it being 
compacted
     PipeInsertionDataNodeListener.getInstance()
-        .listenToTsFile(dataRegionIdString, databaseName, tsFileResource, 
true);
+        .listenToTsFile(dataRegionId.getId(), databaseName, tsFileResource, 
true);
 
     tsFileManager.add(tsFileResource, false);
 
@@ -4336,12 +4336,16 @@ public class DataRegion implements IDataRegionForQuery {
     return dataRegionIdString;
   }
 
+  public int getDataRegionId() {
+    return dataRegionId.getId();
+  }
+
   /**
    * Get the storageGroupPath with dataRegionId.
    *
    * @return data region path, like root.sg1/0
    */
-  public String getStorageGroupPath() {
+  public String getDatabasePath() {
     return databaseName + File.separator + dataRegionIdString;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index acdac61180b..85e0bbba055 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -340,7 +340,7 @@ public class TsFileProcessor {
     }
     PipeInsertionDataNodeListener.getInstance()
         .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionIdString(),
+            dataRegionInfo.getDataRegion().getDataRegionId(),
             dataRegionInfo.getDataRegion().getDatabaseName(),
             insertRowNode,
             tsFileResource);
@@ -437,7 +437,7 @@ public class TsFileProcessor {
     }
     PipeInsertionDataNodeListener.getInstance()
         .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionIdString(),
+            dataRegionInfo.getDataRegion().getDataRegionId(),
             dataRegionInfo.getDataRegion().getDatabaseName(),
             insertRowsNode,
             tsFileResource);
@@ -610,7 +610,7 @@ public class TsFileProcessor {
     }
     PipeInsertionDataNodeListener.getInstance()
         .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionIdString(),
+            dataRegionInfo.getDataRegion().getDataRegionId(),
             dataRegionInfo.getDataRegion().getDatabaseName(),
             insertTabletNode,
             tsFileResource);
@@ -1746,7 +1746,7 @@ public class TsFileProcessor {
     // before resource serialization to avoid missing hardlink after restart
     PipeInsertionDataNodeListener.getInstance()
         .listenToTsFile(
-            dataRegionInfo.getDataRegion().getDataRegionIdString(),
+            dataRegionInfo.getDataRegion().getDataRegionId(),
             dataRegionInfo.getDataRegion().getDatabaseName(),
             tsFileResource,
             false);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
index 59aec4b1f7b..9e07e91e983 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
@@ -70,8 +70,8 @@ public class PipeRealtimeExtractTest {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeRealtimeExtractTest.class);
 
-  private final String dataRegion1 = "1";
-  private final String dataRegion2 = "2";
+  private final int dataRegion1 = 1;
+  private final int dataRegion2 = 2;
   private final String pattern1 = "root.sg.d";
   private final String pattern2 = "root.sg.d.a";
   private final String[] device = new String[] {"root", "sg", "d"};
@@ -151,31 +151,19 @@ public class PipeRealtimeExtractTest {
       final PipeTaskRuntimeConfiguration configuration0 =
           new PipeTaskRuntimeConfiguration(
               new PipeTaskSourceRuntimeEnvironment(
-                  "1",
-                  1,
-                  Integer.parseInt(dataRegion1),
-                  new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+                  "1", 1, dataRegion1, new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
       final PipeTaskRuntimeConfiguration configuration1 =
           new PipeTaskRuntimeConfiguration(
               new PipeTaskSourceRuntimeEnvironment(
-                  "1",
-                  1,
-                  Integer.parseInt(dataRegion1),
-                  new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+                  "1", 1, dataRegion1, new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
       final PipeTaskRuntimeConfiguration configuration2 =
           new PipeTaskRuntimeConfiguration(
               new PipeTaskSourceRuntimeEnvironment(
-                  "1",
-                  1,
-                  Integer.parseInt(dataRegion2),
-                  new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+                  "1", 1, dataRegion2, new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
       final PipeTaskRuntimeConfiguration configuration3 =
           new PipeTaskRuntimeConfiguration(
               new PipeTaskSourceRuntimeEnvironment(
-                  "1",
-                  1,
-                  Integer.parseInt(dataRegion2),
-                  new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+                  "1", 1, dataRegion2, new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
 
       // Some parameters of extractor are validated and initialized during the 
validation process.
       extractor0.validate(new PipeParameterValidator(parameters0));
@@ -274,7 +262,7 @@ public class PipeRealtimeExtractTest {
   }
 
   private Future<?> write2DataRegion(
-      final int writeNum, final String dataRegionId, final int startNum) {
+      final int writeNum, final int dataRegionId, final int startNum) {
     final File dataRegionDir =
         new File(tsFileDir.getPath() + File.separator + dataRegionId + 
File.separator + "0");
     final boolean ignored = dataRegionDir.mkdirs();
@@ -305,7 +293,7 @@ public class PipeRealtimeExtractTest {
             PipeInsertionDataNodeListener.getInstance()
                 .listenToInsertNode(
                     dataRegionId,
-                    dataRegionId,
+                    Integer.toString(dataRegionId),
                     new InsertRowNode(
                         new PlanNodeId(String.valueOf(i)),
                         new PartialPath(device),
@@ -319,7 +307,7 @@ public class PipeRealtimeExtractTest {
             PipeInsertionDataNodeListener.getInstance()
                 .listenToInsertNode(
                     dataRegionId,
-                    dataRegionId,
+                    Integer.toString(dataRegionId),
                     new InsertRowNode(
                         new PlanNodeId(String.valueOf(i)),
                         new PartialPath(device),
@@ -331,7 +319,7 @@ public class PipeRealtimeExtractTest {
                         false),
                     resource);
             PipeInsertionDataNodeListener.getInstance()
-                .listenToTsFile(dataRegionId, dataRegionId, resource, false);
+                .listenToTsFile(dataRegionId, Integer.toString(dataRegionId), 
resource, false);
           }
         });
   }

Reply via email to