This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch pipe-flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8eac942c9984c03f2b4713bc75ecbff5fae23615
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 13 17:22:54 2026 +0800

    IT-fix
---
 .../it/env/cluster/config/MppCommonConfig.java     |  7 +++
 .../env/cluster/config/MppSharedCommonConfig.java  |  7 +++
 .../it/env/remote/config/RemoteCommonConfig.java   |  5 ++
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |  2 +
 .../treemodel/auto/basic/IoTDBPipeSourceIT.java    | 53 +++++++++++++++++++++-
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  3 +-
 .../pipe/consensus/deletion/DeletionResource.java  |  9 ++--
 .../deletion/DeletionResourceManager.java          |  8 ++--
 .../deletion/persist/PageCacheDeletionBuffer.java  |  4 +-
 .../consensus/deletion/recover/DeletionReader.java |  4 +-
 .../event/common/heartbeat/PipeHeartbeatEvent.java |  6 +--
 .../event/realtime/PipeRealtimeEventFactory.java   |  2 +-
 .../db/pipe/metric/source/PipeAssignerMetrics.java | 32 ++++++-------
 ...istoricalDataRegionTsFileAndDeletionSource.java |  4 +-
 .../realtime/PipeRealtimeDataRegionSource.java     |  4 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  |  8 ++--
 .../listener/PipeInsertionDataNodeListener.java    |  8 ++--
 .../db/storageengine/dataregion/DataRegion.java    | 11 +++--
 .../db/pipe/consensus/DeletionRecoverTest.java     |  6 +--
 .../db/pipe/consensus/DeletionResourceTest.java    | 22 ++++-----
 20 files changed, 137 insertions(+), 68 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 53aa395491b..f2b788b772f 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -465,6 +465,13 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeTsFileFlushIntervalSeconds(long 
pipeTsFileFlushIntervalSeconds) {
+    setProperty(
+        "pipe_tsfile_flush_interval_seconds", 
String.valueOf(pipeTsFileFlushIntervalSeconds));
+    return this;
+  }
+
   @Override
   public CommonConfig setIsPipeEnableMemoryCheck(boolean 
isPipeEnableMemoryCheck) {
     setProperty("pipe_enable_memory_checked", 
String.valueOf(isPipeEnableMemoryCheck));
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index ed8a8943303..88a85e27f46 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -479,6 +479,13 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeTsFileFlushIntervalSeconds(long 
pipeTsFileFlushIntervalSeconds) {
+    dnConfig.setPipeTsFileFlushIntervalSeconds(pipeTsFileFlushIntervalSeconds);
+    cnConfig.setPipeTsFileFlushIntervalSeconds(pipeTsFileFlushIntervalSeconds);
+    return this;
+  }
+
   @Override
   public CommonConfig setIsPipeEnableMemoryCheck(boolean 
isPipeEnableMemoryCheck) {
     dnConfig.setIsPipeEnableMemoryCheck(isPipeEnableMemoryCheck);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 0c05af17f6a..31fbe9e78fb 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -338,6 +338,11 @@ public class RemoteCommonConfig implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeTsFileFlushIntervalSeconds(long 
pipeTsFileFlushIntervalSeconds) {
+    return this;
+  }
+
   @Override
   public CommonConfig setIsPipeEnableMemoryCheck(boolean 
isPipeEnableMemoryCheck) {
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 95ac239e89b..92a6af5851d 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -150,6 +150,8 @@ public interface CommonConfig {
 
   CommonConfig setPipeMemoryManagementEnabled(boolean 
pipeMemoryManagementEnabled);
 
+  CommonConfig setPipeTsFileFlushIntervalSeconds(long 
pipeTsFileFlushIntervalSeconds);
+
   CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck);
 
   CommonConfig setPipeAirGapReceiverEnabled(boolean 
isPipeAirGapReceiverEnabled);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index a45115f5a49..3351958d2a1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -74,6 +74,7 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setEnableUnseqSpaceCompaction(false)
         .setEnableCrossSpaceCompaction(false)
         .setPipeMemoryManagementEnabled(false)
+        .setPipeTsFileFlushIntervalSeconds(10)
         .setIsPipeEnableMemoryCheck(false)
         .setPipeAutoSplitFullEnabled(false);
     
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
@@ -323,7 +324,7 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
   }
 
   @Test
-  public void testExtractorInvalidParameter() throws Exception {
+  public void testSourceInvalidParameter() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     final String receiverIp = receiverDataNode.getIp();
@@ -608,6 +609,56 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
     }
   }
 
+  @Test
+  public void testSourceAutoFlush() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final Map<String, String> sourceAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
+
+      sourceAttributes.put("source.realtime.mode", "batch");
+      sourceAttributes.put("user", "root");
+
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("sink.ip", receiverIp);
+      sinkAttributes.put("sink.port", Integer.toString(receiverPort));
+
+      TestUtils.executeNonQueries(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db1.d1 (time, at1) values (1, 10)",
+              "insert into root.db1.d2 (time, at1) values (1, 20)"),
+          null);
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
+                  .setProcessorAttributes(processorAttributes));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      TestUtils.executeNonQueries(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db1.d1 (time, at1) values (2, 10)",
+              "insert into root.db1.d2 (time, at1) values (2, 20)"),
+          null);
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select count(*) from root.db*.**",
+          "count(root.db1.d1.at1),count(root.db2.d1.at1),",
+          Collections.singleton("2,2,"),
+          60);
+    }
+  }
+
   @Test
   public void testHistoryAndRealtime() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 96c519ce636..91bccc6cbda 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -62,8 +62,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
   // to trigger the general event transfer function, causing potentially such 
as
   // the random delay of the batch transmission. Therefore, here we inject 
cron events
   // when no event can be pulled.
-  public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
-      new PipeHeartbeatEvent("cron", false);
+  public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT = new 
PipeHeartbeatEvent(-1, false);
 
   public PipeSinkSubtask(
       final String taskID,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
index 3bf5c1287b7..520b57a229f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
@@ -58,15 +58,12 @@ public class DeletionResource implements PersistentResource 
{
   private volatile Exception cause;
 
   public DeletionResource(
-      AbstractDeleteDataNode deleteDataNode,
-      Consumer<DeletionResource> removeHook,
-      String regionId) {
+      AbstractDeleteDataNode deleteDataNode, Consumer<DeletionResource> 
removeHook, int regionId) {
     this.deleteDataNode = deleteDataNode;
     this.removeHook = removeHook;
     this.currentStatus = Status.RUNNING;
     this.consensusGroupId =
-        ConsensusGroupId.Factory.create(
-            TConsensusGroupType.DataRegion.getValue(), 
Integer.parseInt(regionId));
+        
ConsensusGroupId.Factory.create(TConsensusGroupType.DataRegion.getValue(), 
regionId);
     this.pipeTaskReferenceCount =
         new AtomicInteger(
             
DataRegionConsensusImpl.getInstance().getReplicationNum(consensusGroupId) - 1);
@@ -151,7 +148,7 @@ public class DeletionResource implements PersistentResource 
{
   }
 
   public static DeletionResource deserialize(
-      final ByteBuffer buffer, final String regionId, final 
Consumer<DeletionResource> removeHook)
+      final ByteBuffer buffer, final int regionId, final 
Consumer<DeletionResource> removeHook)
       throws IOException {
     AbstractDeleteDataNode node = DeleteNodeType.deserializeFromDAL(buffer);
     return new DeletionResource(node, removeHook, regionId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
index ff868ffd445..91706fbf54f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
@@ -61,7 +61,7 @@ public class DeletionResourceManager implements AutoCloseable 
{
       String.format(
           "^_(?<%s>\\d+)-(?<%s>\\d+)\\%s$",
           REBOOT_TIME, MEM_TABLE_FLUSH_ORDER, DELETION_FILE_SUFFIX);
-  private final String dataRegionId;
+  private final int dataRegionId;
   private final DeletionBuffer deletionBuffer;
   private final File storageDir;
   private final Map<AbstractDeleteDataNode, DeletionResource> 
deleteNode2ResourcesMap =
@@ -70,7 +70,7 @@ public class DeletionResourceManager implements AutoCloseable 
{
   private final Condition recoveryReadyCondition = recoverLock.newCondition();
   private volatile boolean hasCompletedRecovery = false;
 
-  private DeletionResourceManager(String dataRegionId) throws IOException {
+  private DeletionResourceManager(int dataRegionId) throws IOException {
     this.dataRegionId = dataRegionId;
     this.storageDir =
         new File(
@@ -269,7 +269,7 @@ public class DeletionResourceManager implements 
AutoCloseable {
 
   //////////////////////////// singleton ////////////////////////////
   private static class DeletionResourceManagerHolder {
-    private static Map<String, DeletionResourceManager> 
CONSENSU_GROUP_ID_2_INSTANCE_MAP;
+    private static Map<Integer, DeletionResourceManager> 
CONSENSU_GROUP_ID_2_INSTANCE_MAP;
 
     private DeletionResourceManagerHolder() {}
 
@@ -280,7 +280,7 @@ public class DeletionResourceManager implements 
AutoCloseable {
     }
   }
 
-  public static DeletionResourceManager getInstance(String groupId) {
+  public static DeletionResourceManager getInstance(int groupId) {
     // If consensusImpl is not PipeConsensus.
     if (DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP == 
null) {
       return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
index f401cbbc76a..b77105720ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
@@ -75,7 +75,7 @@ public class PageCacheDeletionBuffer implements 
DeletionBuffer {
                   ? 0
                   : (o1.getProgressIndex().isAfter(o2.getProgressIndex()) ? 1 
: -1));
   // Data region id
-  private final String dataRegionId;
+  private final int dataRegionId;
   // directory to store .deletion files
   private final String baseDirectory;
   // single thread to serialize WALEntry to workingBuffer
@@ -99,7 +99,7 @@ public class PageCacheDeletionBuffer implements 
DeletionBuffer {
   // maxProgressIndex of each batch increases in the same order as the 
physical time.
   private ProgressIndex maxProgressIndexInCurrentFile = 
MinimumProgressIndex.INSTANCE;
 
-  public PageCacheDeletionBuffer(String dataRegionId, String baseDirectory) {
+  public PageCacheDeletionBuffer(int dataRegionId, String baseDirectory) {
     this.dataRegionId = dataRegionId;
     this.baseDirectory = baseDirectory;
     allocateBuffers();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
index 943693e7b00..1fa00cd0a0e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
@@ -40,13 +40,13 @@ public class DeletionReader implements Closeable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DeletionReader.class);
   private static final int MAGIC_STRING_BYTES_SIZE =
       
DeletionResourceManager.MAGIC_VERSION_V1.getBytes(StandardCharsets.UTF_8).length;
-  private final String regionId;
+  private final int regionId;
   private final Consumer<DeletionResource> removeHook;
   private final File logFile;
   private final FileInputStream fileInputStream;
   private final FileChannel fileChannel;
 
-  public DeletionReader(File logFile, String regionId, 
Consumer<DeletionResource> removeHook)
+  public DeletionReader(File logFile, int regionId, Consumer<DeletionResource> 
removeHook)
       throws IOException {
     this.logFile = logFile;
     this.regionId = regionId;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index 468292b8ecc..e90ae1952aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -41,7 +41,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeHeartbeatEvent.class);
 
-  private final String dataRegionId;
+  private final int dataRegionId;
 
   private long timePublished;
   private long timeAssigned;
@@ -62,7 +62,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   private final boolean shouldPrintMessage;
 
-  public PipeHeartbeatEvent(final String dataRegionId, final boolean 
shouldPrintMessage) {
+  public PipeHeartbeatEvent(final int dataRegionId, final boolean 
shouldPrintMessage) {
     super(null, 0, null, null, null, null, null, null, true, Long.MIN_VALUE, 
Long.MAX_VALUE);
     this.dataRegionId = dataRegionId;
     this.shouldPrintMessage = shouldPrintMessage;
@@ -72,7 +72,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final String dataRegionId,
+      final int dataRegionId,
       final long timePublished,
       final boolean shouldPrintMessage) {
     super(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index acc62f7e7a4..540589326ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -57,7 +57,7 @@ public class PipeRealtimeEventFactory {
   }
 
   public static PipeRealtimeEvent createRealtimeEvent(
-      final String dataRegionId, final boolean shouldPrintMessage) {
+      final int dataRegionId, final boolean shouldPrintMessage) {
     return new PipeRealtimeEvent(
         new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java
index 7f89cdc5372..58f8e31da26 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java
@@ -41,7 +41,7 @@ public class PipeAssignerMetrics implements IMetricSet {
 
   private AbstractMetricService metricService;
 
-  private final Map<String, PipeDataRegionAssigner> assignerMap = new 
HashMap<>();
+  private final Map<Integer, PipeDataRegionAssigner> assignerMap = new 
HashMap<>();
 
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
@@ -49,44 +49,44 @@ public class PipeAssignerMetrics implements IMetricSet {
   public void bindTo(AbstractMetricService metricService) {
     this.metricService = metricService;
     synchronized (this) {
-      for (String dataRegionId : assignerMap.keySet()) {
+      for (Integer dataRegionId : assignerMap.keySet()) {
         createMetrics(dataRegionId);
       }
     }
   }
 
-  private void createMetrics(String dataRegionId) {
+  private void createMetrics(int dataRegionId) {
     createAutoGauge(dataRegionId);
   }
 
-  private void createAutoGauge(String dataRegionId) {
+  private void createAutoGauge(int dataRegionId) {
     metricService.createAutoGauge(
         Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(),
         MetricLevel.IMPORTANT,
         assignerMap.get(dataRegionId),
         PipeDataRegionAssigner::getPipeHeartbeatEventCount,
         Tag.REGION.toString(),
-        dataRegionId);
+        Integer.toString(dataRegionId));
     metricService.createAutoGauge(
         Metric.UNASSIGNED_TABLET_COUNT.toString(),
         MetricLevel.IMPORTANT,
         assignerMap.get(dataRegionId),
         PipeDataRegionAssigner::getTabletInsertionEventCount,
         Tag.REGION.toString(),
-        dataRegionId);
+        Integer.toString(dataRegionId));
     metricService.createAutoGauge(
         Metric.UNASSIGNED_TSFILE_COUNT.toString(),
         MetricLevel.IMPORTANT,
         assignerMap.get(dataRegionId),
         PipeDataRegionAssigner::getTsFileInsertionEventCount,
         Tag.REGION.toString(),
-        dataRegionId);
+        Integer.toString(dataRegionId));
   }
 
   @Override
   public void unbindFrom(AbstractMetricService metricService) {
-    ImmutableSet<String> dataRegionIds = 
ImmutableSet.copyOf(assignerMap.keySet());
-    for (String dataRegionId : dataRegionIds) {
+    ImmutableSet<Integer> dataRegionIds = 
ImmutableSet.copyOf(assignerMap.keySet());
+    for (int dataRegionId : dataRegionIds) {
       deregister(dataRegionId);
     }
     if (!assignerMap.isEmpty()) {
@@ -94,32 +94,32 @@ public class PipeAssignerMetrics implements IMetricSet {
     }
   }
 
-  private void removeMetrics(String dataRegionId) {
+  private void removeMetrics(int dataRegionId) {
     removeAutoGauge(dataRegionId);
   }
 
-  private void removeAutoGauge(String dataRegionId) {
+  private void removeAutoGauge(int dataRegionId) {
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(),
         Tag.REGION.toString(),
-        dataRegionId);
+        Integer.toString(dataRegionId));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNASSIGNED_TABLET_COUNT.toString(),
         Tag.REGION.toString(),
-        dataRegionId);
+        Integer.toString(dataRegionId));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNASSIGNED_TSFILE_COUNT.toString(),
         Tag.REGION.toString(),
-        dataRegionId);
+        Integer.toString(dataRegionId));
   }
 
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
 
   public void register(PipeDataRegionAssigner pipeDataRegionAssigner) {
-    String dataRegionId = pipeDataRegionAssigner.getDataRegionId();
+    int dataRegionId = pipeDataRegionAssigner.getDataRegionId();
     synchronized (this) {
       assignerMap.putIfAbsent(dataRegionId, pipeDataRegionAssigner);
       if (Objects.nonNull(metricService)) {
@@ -128,7 +128,7 @@ public class PipeAssignerMetrics implements IMetricSet {
     }
   }
 
-  public void deregister(String dataRegionId) {
+  public void deregister(int dataRegionId) {
     synchronized (this) {
       if (!assignerMap.containsKey(dataRegionId)) {
         LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index ab74d094740..8aac47b01c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -498,7 +498,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
         extractTsFiles(dataRegion, startHistoricalExtractionTime, 
originalResourceList);
       }
       if (shouldExtractDeletion) {
-        
Optional.ofNullable(DeletionResourceManager.getInstance(String.valueOf(dataRegionId)))
+        Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId))
             .ifPresent(manager -> extractDeletions(manager, 
originalResourceList));
       }
 
@@ -946,7 +946,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
           dataRegionId,
           event);
     } else {
-      
Optional.ofNullable(DeletionResourceManager.getInstance(String.valueOf(dataRegionId)))
+      Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId))
           .ifPresent(
               manager ->
                   event.setDeletionResource(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 3dec073f057..3f0d5f4687a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -91,7 +91,7 @@ public abstract class PipeRealtimeDataRegionSource implements 
PipeExtractor {
 
   protected String pipeName;
   protected long creationTime;
-  protected int dataRegionId;
+  protected int dataRegionId = -1;
   protected PipeTaskMeta pipeTaskMeta;
 
   protected boolean shouldExtractInsertion;
@@ -317,7 +317,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
 
   @Override
   public void close() throws Exception {
-    if (Objects.nonNull(dataRegionId)) {
+    if (dataRegionId >= 0) {
       
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, 
this);
       PipeTimePartitionListener.getInstance().stopListen(dataRegionId, this);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 8ea973fbf0a..e60e1ad898d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -66,24 +66,24 @@ public class PipeDataRegionAssigner implements Closeable {
   /** The {@link DisruptorQueue} is used to assign the event to the extractor. 
*/
   private final DisruptorQueue disruptor;
 
-  private final String dataRegionId;
+  private final int dataRegionId;
 
   private Boolean isTableModel;
 
   private final PipeEventCounter eventCounter = new 
PipeDataRegionEventCounter();
 
-  public String getDataRegionId() {
+  public int getDataRegionId() {
     return dataRegionId;
   }
 
-  public PipeDataRegionAssigner(final String dataRegionId) {
+  public PipeDataRegionAssigner(final int dataRegionId) {
     this.matcher = new CachedSchemaPatternMatcher();
     this.disruptor = new DisruptorQueue(this::assignToExtractor, 
this::onAssignedHook);
     this.dataRegionId = dataRegionId;
     PipeAssignerMetrics.getInstance().register(this);
 
     final DataRegion dataRegion =
-        StorageEngine.getInstance().getDataRegion(new 
DataRegionId(Integer.parseInt(dataRegionId)));
+        StorageEngine.getInstance().getDataRegion(new 
DataRegionId(dataRegionId));
     if (Objects.nonNull(dataRegion)) {
       final String databaseName = dataRegion.getDatabaseName();
       if (Objects.nonNull(databaseName)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 8c49dd0a3dd..8fb95d78fc4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * will filter events and assign them to different 
PipeRealtimeEventDataRegionExtractors.
  */
 public class PipeInsertionDataNodeListener {
-  private final ConcurrentMap<String, PipeDataRegionAssigner> 
dataRegionId2Assigner =
+  private final ConcurrentMap<Integer, PipeDataRegionAssigner> 
dataRegionId2Assigner =
       new ConcurrentHashMap<>();
 
   private final AtomicInteger listenToTsFileExtractorCount = new 
AtomicInteger(0);
@@ -57,7 +57,7 @@ public class PipeInsertionDataNodeListener {
   //////////////////////////// start & stop ////////////////////////////
 
   public synchronized void startListenAndAssign(
-      final String dataRegionId, final PipeRealtimeDataRegionSource extractor) 
{
+      final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
     dataRegionId2Assigner
         .computeIfAbsent(dataRegionId, o -> new 
PipeDataRegionAssigner(dataRegionId))
         .startAssignTo(extractor);
@@ -71,7 +71,7 @@ public class PipeInsertionDataNodeListener {
   }
 
   public synchronized void stopListenAndAssign(
-      final String dataRegionId, final PipeRealtimeDataRegionSource extractor) 
{
+      final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
     if (assigner == null) {
       return;
@@ -139,7 +139,7 @@ public class PipeInsertionDataNodeListener {
   }
 
   public DeletionResource listenToDeleteData(
-      final String regionId, final AbstractDeleteDataNode node) {
+      final int regionId, final AbstractDeleteDataNode node) {
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(regionId);
     // only events from registered data region will be extracted
     if (assigner == null
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index d95d51e390e..8a76f891679 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2009,7 +2009,7 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   public void deleteDALFolderAndClose() {
-    
Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionIdString))
+    
Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId.getId()))
         .ifPresent(
             manager -> {
               manager.close();
@@ -2823,7 +2823,8 @@ public class DataRegion implements IDataRegionForQuery {
       deleteDataInUnsealedFiles(unsealedTsFileResource, deletion, 
sealedTsFileResource);
       // capture deleteDataNode and wait it to be persisted to DAL.
       DeletionResource deletionResource =
-          
PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString,
 node);
+          PipeInsertionDataNodeListener.getInstance()
+              .listenToDeleteData(dataRegionId.getId(), node);
       // just get result. We have already waited for result in 
`listenToDeleteData`
       if (deletionResource != null && deletionResource.waitForResult() == 
Status.FAILURE) {
         throw deletionResource.getCause();
@@ -2926,7 +2927,8 @@ public class DataRegion implements IDataRegionForQuery {
 
       // capture deleteDataNode and wait it to be persisted to DAL.
       DeletionResource deletionResource =
-          
PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString,
 node);
+          PipeInsertionDataNodeListener.getInstance()
+              .listenToDeleteData(dataRegionId.getId(), node);
       // just get result. We have already waited for result in 
`listenToDeleteData`
       if (deletionResource != null && deletionResource.waitForResult() == 
Status.FAILURE) {
         throw deletionResource.getCause();
@@ -2983,7 +2985,8 @@ public class DataRegion implements IDataRegionForQuery {
       deleteDataDirectlyInFile(unsealedTsFileResource, deletion);
       // capture deleteDataNode and wait it to be persisted to DAL.
       DeletionResource deletionResource =
-          
PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString,
 node);
+          PipeInsertionDataNodeListener.getInstance()
+              .listenToDeleteData(dataRegionId.getId(), node);
       // just get result. We have already waited for result in 
`listenToDeleteData`
       if (deletionResource != null && deletionResource.waitForResult() == 
Status.FAILURE) {
         throw deletionResource.getCause();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
index 06f823c0e23..c7a93b9d072 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
@@ -42,7 +42,7 @@ import java.io.File;
 import java.util.Collections;
 
 public class DeletionRecoverTest {
-  private static final String[] FAKE_DATA_REGION_IDS = {"2", "3"};
+  private static final Integer[] FAKE_DATA_REGION_IDS = {2, 3};
   private static final int THIS_DATANODE_ID =
       IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
   private static final String DELETION_BASE_DIR =
@@ -50,7 +50,7 @@ public class DeletionRecoverTest {
   private final int deletionCount = 10;
   private DeletionResourceManager deletionResourceManager;
 
-  public void setUp(boolean isRelational, String FAKE_DATA_REGION_ID) throws 
Exception {
+  public void setUp(boolean isRelational, int FAKE_DATA_REGION_ID) throws 
Exception {
     File baseDir = new File(DELETION_BASE_DIR + File.separator + 
FAKE_DATA_REGION_ID);
     if (baseDir.exists()) {
       FileUtils.deleteFileOrDirectory(baseDir);
@@ -84,7 +84,7 @@ public class DeletionRecoverTest {
 
   @After
   public void tearDown() throws Exception {
-    for (String FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
+    for (Integer FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
       File baseDir = new File(DELETION_BASE_DIR + File.separator + 
FAKE_DATA_REGION_ID);
       if (baseDir.exists()) {
         FileUtils.deleteFileOrDirectory(baseDir);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
index f94d909f94b..aa7c5c0c954 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
@@ -66,7 +66,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class DeletionResourceTest {
-  private static final String[] FAKE_DATA_REGION_IDS = {"2", "3", "4", "5", 
"6"};
+  private static final Integer[] FAKE_DATA_REGION_IDS = {2, 3, 4, 5, 6};
   private static final String DELETION_BASE_DIR =
       
IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2DeletionFileDir();
   private static final int THIS_DATANODE_ID = 0;
@@ -84,7 +84,7 @@ public class DeletionResourceTest {
   @After
   public void tearDown() throws Exception {
     
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(previousDataNodeId);
-    for (String FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
+    for (Integer FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
       File baseDir = new File(DELETION_BASE_DIR + File.separator + 
FAKE_DATA_REGION_ID);
       if (baseDir.exists()) {
         FileUtils.deleteFileOrDirectory(baseDir);
@@ -212,7 +212,7 @@ public class DeletionResourceTest {
           new PipeDeleteDataNodeEvent(
               deleteDataNode, "Test", 10, null, null, null, null, null, null, 
true, true);
       deletionEvent.setCommitterKeyAndCommitId(
-          new CommitterKey("Test", 10, 
Integer.parseInt(FAKE_DATA_REGION_IDS[3]), 0), i + 1);
+          new CommitterKey("Test", 10, FAKE_DATA_REGION_IDS[3], 0), i + 1);
       deletionEvents.add(deletionEvent);
 
       final DeletionResource deletionResource =
@@ -227,8 +227,7 @@ public class DeletionResourceTest {
 
     // for event commit to invoke onCommit() to removeDAL
     if (initialIndex == 0) {
-      PipeEventCommitManager.getInstance()
-          .register("Test", 10, Integer.parseInt(FAKE_DATA_REGION_IDS[3]), 
"Test");
+      PipeEventCommitManager.getInstance().register("Test", 10, 
FAKE_DATA_REGION_IDS[3], "Test");
     }
     deletionEvents.forEach(deletionEvent -> 
deletionEvent.increaseReferenceCount("test"));
     final List<Path> paths =
@@ -255,7 +254,7 @@ public class DeletionResourceTest {
   @Test
   public void testWaitForResult() throws Exception {
     // prepare pipe component
-    final PipeRealtimeDataRegionSource extractor = new 
PipeRealtimeDataRegionHybridSource();
+    final PipeRealtimeDataRegionSource source = new 
PipeRealtimeDataRegionHybridSource();
     final PipeParameters parameters =
         new PipeParameters(
             new HashMap<String, String>() {
@@ -265,13 +264,12 @@ public class DeletionResourceTest {
             });
     final PipeTaskRuntimeConfiguration configuration =
         new PipeTaskRuntimeConfiguration(
-            new PipeTaskSourceRuntimeEnvironment(
-                "1", 1, Integer.parseInt(FAKE_DATA_REGION_IDS[4]), null));
-    extractor.customize(parameters, configuration);
-    Assert.assertTrue(extractor.shouldExtractDeletion());
+            new PipeTaskSourceRuntimeEnvironment("1", 1, 
FAKE_DATA_REGION_IDS[4], null));
+    source.customize(parameters, configuration);
+    Assert.assertTrue(source.shouldExtractDeletion());
 
     PipeInsertionDataNodeListener.getInstance()
-        .startListenAndAssign(FAKE_DATA_REGION_IDS[4], extractor);
+        .startListenAndAssign(FAKE_DATA_REGION_IDS[4], source);
     deletionResourceManager = 
DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[4]);
     final int rebootTimes = 0;
     final MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
@@ -285,6 +283,6 @@ public class DeletionResourceTest {
     Assert.assertSame(Status.SUCCESS, deletionResource.waitForResult());
     // close pipe resource
     PipeInsertionDataNodeListener.getInstance()
-        .stopListenAndAssign(FAKE_DATA_REGION_IDS[4], extractor);
+        .stopListenAndAssign(FAKE_DATA_REGION_IDS[4], source);
   }
 }


Reply via email to