This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch int-refactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/int-refactor by this push:
     new e83510467c1 refactor
e83510467c1 is described below

commit e83510467c1a8995bde52d878683dcbe03f7224d
Author: Caideyipi <[email protected]>
AuthorDate: Tue Feb 24 10:34:50 2026 +0800

    refactor
---
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  3 +-
 .../pipe/consensus/deletion/DeletionResource.java  |  9 +--
 .../deletion/DeletionResourceManager.java          | 20 +++---
 .../deletion/persist/PageCacheDeletionBuffer.java  |  4 +-
 .../consensus/deletion/recover/DeletionReader.java |  4 +-
 .../event/common/heartbeat/PipeHeartbeatEvent.java | 80 +++++++++++-----------
 .../tsfile/PipeCompactedTsFileInsertionEvent.java  |  4 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  3 +-
 .../event/realtime/PipeRealtimeEventFactory.java   |  2 +-
 .../db/pipe/metric/source/PipeAssignerMetrics.java | 32 ++++-----
 ...istoricalDataRegionTsFileAndDeletionSource.java |  4 +-
 .../realtime/PipeRealtimeDataRegionSource.java     |  8 +--
 .../realtime/assigner/PipeDataRegionAssigner.java  | 75 ++++++++++----------
 .../PipeTsFileEpochProgressIndexKeeper.java        |  8 +--
 .../listener/PipeInsertionDataNodeListener.java    | 12 ++--
 .../listener/PipeTimePartitionListener.java        | 21 +++---
 .../db/storageengine/dataregion/DataRegion.java    | 17 +++--
 .../dataregion/memtable/TsFileProcessor.java       |  8 +--
 .../rescon/memory/TimePartitionManager.java        |  2 +-
 .../db/pipe/consensus/DeletionRecoverTest.java     |  6 +-
 .../db/pipe/consensus/DeletionResourceTest.java    | 12 ++--
 .../db/pipe/source/PipeRealtimeExtractTest.java    | 32 +++------
 22 files changed, 174 insertions(+), 192 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 96c519ce636..91bccc6cbda 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -62,8 +62,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
   // to trigger the general event transfer function, causing potentially such 
as
   // the random delay of the batch transmission. Therefore, here we inject 
cron events
   // when no event can be pulled.
-  public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
-      new PipeHeartbeatEvent("cron", false);
+  public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT = new 
PipeHeartbeatEvent(-1, false);
 
   public PipeSinkSubtask(
       final String taskID,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
index 3bf5c1287b7..520b57a229f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
@@ -58,15 +58,12 @@ public class DeletionResource implements PersistentResource 
{
   private volatile Exception cause;
 
   public DeletionResource(
-      AbstractDeleteDataNode deleteDataNode,
-      Consumer<DeletionResource> removeHook,
-      String regionId) {
+      AbstractDeleteDataNode deleteDataNode, Consumer<DeletionResource> 
removeHook, int regionId) {
     this.deleteDataNode = deleteDataNode;
     this.removeHook = removeHook;
     this.currentStatus = Status.RUNNING;
     this.consensusGroupId =
-        ConsensusGroupId.Factory.create(
-            TConsensusGroupType.DataRegion.getValue(), 
Integer.parseInt(regionId));
+        
ConsensusGroupId.Factory.create(TConsensusGroupType.DataRegion.getValue(), 
regionId);
     this.pipeTaskReferenceCount =
         new AtomicInteger(
             
DataRegionConsensusImpl.getInstance().getReplicationNum(consensusGroupId) - 1);
@@ -151,7 +148,7 @@ public class DeletionResource implements PersistentResource 
{
   }
 
   public static DeletionResource deserialize(
-      final ByteBuffer buffer, final String regionId, final 
Consumer<DeletionResource> removeHook)
+      final ByteBuffer buffer, final int regionId, final 
Consumer<DeletionResource> removeHook)
       throws IOException {
     AbstractDeleteDataNode node = DeleteNodeType.deserializeFromDAL(buffer);
     return new DeletionResource(node, removeHook, regionId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
index ff868ffd445..8e1dc8bb470 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
@@ -61,7 +61,7 @@ public class DeletionResourceManager implements AutoCloseable 
{
       String.format(
           "^_(?<%s>\\d+)-(?<%s>\\d+)\\%s$",
           REBOOT_TIME, MEM_TABLE_FLUSH_ORDER, DELETION_FILE_SUFFIX);
-  private final String dataRegionId;
+  private final int dataRegionId;
   private final DeletionBuffer deletionBuffer;
   private final File storageDir;
   private final Map<AbstractDeleteDataNode, DeletionResource> 
deleteNode2ResourcesMap =
@@ -70,7 +70,7 @@ public class DeletionResourceManager implements AutoCloseable 
{
   private final Condition recoveryReadyCondition = recoverLock.newCondition();
   private volatile boolean hasCompletedRecovery = false;
 
-  private DeletionResourceManager(String dataRegionId) throws IOException {
+  private DeletionResourceManager(int dataRegionId) throws IOException {
     this.dataRegionId = dataRegionId;
     this.storageDir =
         new File(
@@ -269,23 +269,23 @@ public class DeletionResourceManager implements 
AutoCloseable {
 
   //////////////////////////// singleton ////////////////////////////
   private static class DeletionResourceManagerHolder {
-    private static Map<String, DeletionResourceManager> 
CONSENSU_GROUP_ID_2_INSTANCE_MAP;
+    private static Map<Integer, DeletionResourceManager> 
CONSENSUS_GROUP_ID_2_INSTANCE_MAP;
 
     private DeletionResourceManagerHolder() {}
 
     public static void build() {
-      if (CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) {
-        CONSENSU_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
+      if (CONSENSUS_GROUP_ID_2_INSTANCE_MAP == null) {
+        CONSENSUS_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
       }
     }
   }
 
-  public static DeletionResourceManager getInstance(String groupId) {
+  public static DeletionResourceManager getInstance(int groupId) {
     // If consensusImpl is not PipeConsensus.
-    if (DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP == 
null) {
+    if (DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP == 
null) {
       return null;
     }
-    return 
DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
+    return 
DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
         groupId,
         key -> {
           try {
@@ -305,10 +305,10 @@ public class DeletionResourceManager implements 
AutoCloseable {
   }
 
   public static void exit() {
-    if (DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP == 
null) {
+    if (DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP == 
null) {
       return;
     }
-    DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.forEach(
+    DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.forEach(
         (groupId, resourceManager) -> {
           resourceManager.close();
         });
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
index f401cbbc76a..b77105720ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
@@ -75,7 +75,7 @@ public class PageCacheDeletionBuffer implements 
DeletionBuffer {
                   ? 0
                   : (o1.getProgressIndex().isAfter(o2.getProgressIndex()) ? 1 
: -1));
   // Data region id
-  private final String dataRegionId;
+  private final int dataRegionId;
   // directory to store .deletion files
   private final String baseDirectory;
   // single thread to serialize WALEntry to workingBuffer
@@ -99,7 +99,7 @@ public class PageCacheDeletionBuffer implements 
DeletionBuffer {
   // maxProgressIndex of each batch increases in the same order as the 
physical time.
   private ProgressIndex maxProgressIndexInCurrentFile = 
MinimumProgressIndex.INSTANCE;
 
-  public PageCacheDeletionBuffer(String dataRegionId, String baseDirectory) {
+  public PageCacheDeletionBuffer(int dataRegionId, String baseDirectory) {
     this.dataRegionId = dataRegionId;
     this.baseDirectory = baseDirectory;
     allocateBuffers();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
index 943693e7b00..1fa00cd0a0e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
@@ -40,13 +40,13 @@ public class DeletionReader implements Closeable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DeletionReader.class);
   private static final int MAGIC_STRING_BYTES_SIZE =
       
DeletionResourceManager.MAGIC_VERSION_V1.getBytes(StandardCharsets.UTF_8).length;
-  private final String regionId;
+  private final int regionId;
   private final Consumer<DeletionResource> removeHook;
   private final File logFile;
   private final FileInputStream fileInputStream;
   private final FileChannel fileChannel;
 
-  public DeletionReader(File logFile, String regionId, 
Consumer<DeletionResource> removeHook)
+  public DeletionReader(File logFile, int regionId, Consumer<DeletionResource> 
removeHook)
       throws IOException {
     this.logFile = logFile;
     this.regionId = regionId;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index 468292b8ecc..8ed7317d2bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -41,7 +41,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeHeartbeatEvent.class);
 
-  private final String dataRegionId;
+  private final int dataRegionId;
 
   private long timePublished;
   private long timeAssigned;
@@ -52,17 +52,17 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   // The disruptor is usually nearly empty.
   private int disruptorSize;
 
-  private int extractorQueueTabletSize;
-  private int extractorQueueTsFileSize;
-  private int extractorQueueSize;
+  private int sourceQueueTabletSize;
+  private int sourceQueueTsFileSize;
+  private int sourceQueueSize;
 
-  private int connectorQueueTabletSize;
-  private int connectorQueueTsFileSize;
-  private int connectorQueueSize;
+  private int sinkQueueTabletSize;
+  private int sinkQueueTsFileSize;
+  private int sinkQueueSize;
 
   private final boolean shouldPrintMessage;
 
-  public PipeHeartbeatEvent(final String dataRegionId, final boolean 
shouldPrintMessage) {
+  public PipeHeartbeatEvent(final int dataRegionId, final boolean 
shouldPrintMessage) {
     super(null, 0, null, null, null, null, null, null, true, Long.MIN_VALUE, 
Long.MAX_VALUE);
     this.dataRegionId = dataRegionId;
     this.shouldPrintMessage = shouldPrintMessage;
@@ -72,7 +72,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final String dataRegionId,
+      final int dataRegionId,
       final long timePublished,
       final boolean shouldPrintMessage) {
     super(
@@ -104,7 +104,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   @Override
   public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     // PipeName == null indicates that the event is the raw event at disruptor,
-    // not the event copied and passed to the extractor
+    // not the event copied and passed to the source
     if (Objects.nonNull(pipeName)) {
       PipeDataNodeSinglePipeMetrics.getInstance()
           .decreaseHeartbeatEventCount(pipeName, creationTime);
@@ -208,17 +208,17 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   public void recordExtractorQueueSize(final 
UnboundedBlockingPendingQueue<Event> pendingQueue) {
     if (shouldPrintMessage) {
-      extractorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
-      extractorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
-      extractorQueueSize = pendingQueue.size();
+      sourceQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
+      sourceQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
+      sourceQueueSize = pendingQueue.size();
     }
   }
 
   public void recordConnectorQueueSize(final 
UnboundedBlockingPendingQueue<Event> pendingQueue) {
     if (shouldPrintMessage) {
-      connectorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
-      connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
-      connectorQueueSize = pendingQueue.size();
+      sinkQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
+      sinkQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
+      sinkQueueSize = pendingQueue.size();
     }
   }
 
@@ -259,19 +259,19 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
     final String disruptorSizeMessage = Integer.toString(disruptorSize);
 
-    final String extractorQueueTabletSizeMessage =
-        timeAssigned != 0 ? Integer.toString(extractorQueueTabletSize) : 
unknownMessage;
-    final String extractorQueueTsFileSizeMessage =
-        timeAssigned != 0 ? Integer.toString(extractorQueueTsFileSize) : 
unknownMessage;
-    final String extractorQueueSizeMessage =
-        timeAssigned != 0 ? Integer.toString(extractorQueueSize) : 
unknownMessage;
+    final String sourceQueueTabletSizeMessage =
+        timeAssigned != 0 ? Integer.toString(sourceQueueTabletSize) : 
unknownMessage;
+    final String sourceQueueTsFileSizeMessage =
+        timeAssigned != 0 ? Integer.toString(sourceQueueTsFileSize) : 
unknownMessage;
+    final String sourceQueueSizeMessage =
+        timeAssigned != 0 ? Integer.toString(sourceQueueSize) : unknownMessage;
 
-    final String connectorQueueTabletSizeMessage =
-        timeProcessed != 0 ? Integer.toString(connectorQueueTabletSize) : 
unknownMessage;
-    final String connectorQueueTsFileSizeMessage =
-        timeProcessed != 0 ? Integer.toString(connectorQueueTsFileSize) : 
unknownMessage;
-    final String connectorQueueSizeMessage =
-        timeProcessed != 0 ? Integer.toString(connectorQueueSize) : 
unknownMessage;
+    final String sinkQueueTabletSizeMessage =
+        timeProcessed != 0 ? Integer.toString(sinkQueueTabletSize) : 
unknownMessage;
+    final String sinkQueueTsFileSizeMessage =
+        timeProcessed != 0 ? Integer.toString(sinkQueueTsFileSize) : 
unknownMessage;
+    final String sinkQueueSizeMessage =
+        timeProcessed != 0 ? Integer.toString(sinkQueueSize) : unknownMessage;
 
     return "PipeHeartbeatEvent{"
         + "pipeName='"
@@ -290,18 +290,18 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
         + totalTimeMessage
         + ", disruptorSize="
         + disruptorSizeMessage
-        + ", extractorQueueTabletSize="
-        + extractorQueueTabletSizeMessage
-        + ", extractorQueueTsFileSize="
-        + extractorQueueTsFileSizeMessage
-        + ", extractorQueueSize="
-        + extractorQueueSizeMessage
-        + ", connectorQueueTabletSize="
-        + connectorQueueTabletSizeMessage
-        + ", connectorQueueTsFileSize="
-        + connectorQueueTsFileSizeMessage
-        + ", connectorQueueSize="
-        + connectorQueueSizeMessage
+        + ", sourceQueueTabletSize="
+        + sourceQueueTabletSizeMessage
+        + ", sourceQueueTsFileSize="
+        + sourceQueueTsFileSizeMessage
+        + ", sourceQueueSize="
+        + sourceQueueSizeMessage
+        + ", sinkQueueTabletSize="
+        + sinkQueueTabletSizeMessage
+        + ", sinkQueueTsFileSize="
+        + sinkQueueTsFileSizeMessage
+        + ", sinkQueueSize="
+        + sinkQueueSizeMessage
         + "}";
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
index bf3f2c97acb..cdcb8734503 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
 
 public class PipeCompactedTsFileInsertionEvent extends 
PipeTsFileInsertionEvent {
 
-  private final String dataRegionId;
+  private final int dataRegionId;
   private final Set<String> originFilePaths;
   private final List<Long> commitIds;
 
@@ -70,7 +70,7 @@ public class PipeCompactedTsFileInsertionEvent extends 
PipeTsFileInsertionEvent
         anyOfOriginalEvents.getStartTime(),
         anyOfOriginalEvents.getEndTime());
 
-    this.dataRegionId = String.valueOf(committerKey.getRegionId());
+    this.dataRegionId = committerKey.getRegionId();
     this.originFilePaths =
         originalEvents.stream()
             .map(PipeTsFileInsertionEvent::getTsFile)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 6b3e505d2ea..2904e3e37cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -400,7 +400,8 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
   public void eliminateProgressIndex() {
     if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
       PipeTsFileEpochProgressIndexKeeper.getInstance()
-          .eliminateProgressIndex(resource.getDataRegionId(), pipeName, 
resource.getTsFilePath());
+          .eliminateProgressIndex(
+              Integer.parseInt(resource.getDataRegionId()), pipeName, 
resource.getTsFilePath());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index acc62f7e7a4..540589326ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -57,7 +57,7 @@ public class PipeRealtimeEventFactory {
   }
 
   public static PipeRealtimeEvent createRealtimeEvent(
-      final String dataRegionId, final boolean shouldPrintMessage) {
+      final int dataRegionId, final boolean shouldPrintMessage) {
     return new PipeRealtimeEvent(
         new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java
index 7f89cdc5372..59e4e892d8e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java
@@ -41,7 +41,7 @@ public class PipeAssignerMetrics implements IMetricSet {
 
   private AbstractMetricService metricService;
 
-  private final Map<String, PipeDataRegionAssigner> assignerMap = new 
HashMap<>();
+  private final Map<Integer, PipeDataRegionAssigner> assignerMap = new 
HashMap<>();
 
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
@@ -49,44 +49,44 @@ public class PipeAssignerMetrics implements IMetricSet {
   public void bindTo(AbstractMetricService metricService) {
     this.metricService = metricService;
     synchronized (this) {
-      for (String dataRegionId : assignerMap.keySet()) {
+      for (int dataRegionId : assignerMap.keySet()) {
         createMetrics(dataRegionId);
       }
     }
   }
 
-  private void createMetrics(String dataRegionId) {
+  private void createMetrics(int dataRegionId) {
     createAutoGauge(dataRegionId);
   }
 
-  private void createAutoGauge(String dataRegionId) {
+  private void createAutoGauge(int dataRegionId) {
     metricService.createAutoGauge(
         Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(),
         MetricLevel.IMPORTANT,
         assignerMap.get(dataRegionId),
         PipeDataRegionAssigner::getPipeHeartbeatEventCount,
         Tag.REGION.toString(),
-        dataRegionId);
+        Integer.toString(dataRegionId));
     metricService.createAutoGauge(
         Metric.UNASSIGNED_TABLET_COUNT.toString(),
         MetricLevel.IMPORTANT,
         assignerMap.get(dataRegionId),
         PipeDataRegionAssigner::getTabletInsertionEventCount,
         Tag.REGION.toString(),
-        dataRegionId);
+        Integer.toString(dataRegionId));
     metricService.createAutoGauge(
         Metric.UNASSIGNED_TSFILE_COUNT.toString(),
         MetricLevel.IMPORTANT,
         assignerMap.get(dataRegionId),
         PipeDataRegionAssigner::getTsFileInsertionEventCount,
         Tag.REGION.toString(),
-        dataRegionId);
+        Integer.toString(dataRegionId));
   }
 
   @Override
   public void unbindFrom(AbstractMetricService metricService) {
-    ImmutableSet<String> dataRegionIds = 
ImmutableSet.copyOf(assignerMap.keySet());
-    for (String dataRegionId : dataRegionIds) {
+    ImmutableSet<Integer> dataRegionIds = 
ImmutableSet.copyOf(assignerMap.keySet());
+    for (int dataRegionId : dataRegionIds) {
       deregister(dataRegionId);
     }
     if (!assignerMap.isEmpty()) {
@@ -94,32 +94,32 @@ public class PipeAssignerMetrics implements IMetricSet {
     }
   }
 
-  private void removeMetrics(String dataRegionId) {
+  private void removeMetrics(int dataRegionId) {
     removeAutoGauge(dataRegionId);
   }
 
-  private void removeAutoGauge(String dataRegionId) {
+  private void removeAutoGauge(int dataRegionId) {
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(),
         Tag.REGION.toString(),
-        dataRegionId);
+        Integer.toString(dataRegionId));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNASSIGNED_TABLET_COUNT.toString(),
         Tag.REGION.toString(),
-        dataRegionId);
+        Integer.toString(dataRegionId));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNASSIGNED_TSFILE_COUNT.toString(),
         Tag.REGION.toString(),
-        dataRegionId);
+        Integer.toString(dataRegionId));
   }
 
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
 
   public void register(PipeDataRegionAssigner pipeDataRegionAssigner) {
-    String dataRegionId = pipeDataRegionAssigner.getDataRegionId();
+    int dataRegionId = pipeDataRegionAssigner.getDataRegionId();
     synchronized (this) {
       assignerMap.putIfAbsent(dataRegionId, pipeDataRegionAssigner);
       if (Objects.nonNull(metricService)) {
@@ -128,7 +128,7 @@ public class PipeAssignerMetrics implements IMetricSet {
     }
   }
 
-  public void deregister(String dataRegionId) {
+  public void deregister(final int dataRegionId) {
     synchronized (this) {
       if (!assignerMap.containsKey(dataRegionId)) {
         LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index ab74d094740..8aac47b01c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -498,7 +498,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
         extractTsFiles(dataRegion, startHistoricalExtractionTime, 
originalResourceList);
       }
       if (shouldExtractDeletion) {
-        
Optional.ofNullable(DeletionResourceManager.getInstance(String.valueOf(dataRegionId)))
+        Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId))
             .ifPresent(manager -> extractDeletions(manager, 
originalResourceList));
       }
 
@@ -946,7 +946,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
           dataRegionId,
           event);
     } else {
-      
Optional.ofNullable(DeletionResourceManager.getInstance(String.valueOf(dataRegionId)))
+      Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId))
           .ifPresent(
               manager ->
                   event.setDeletionResource(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index cd00e2975a0..de0ee264473 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -91,7 +91,7 @@ public abstract class PipeRealtimeDataRegionSource implements 
PipeExtractor {
 
   protected String pipeName;
   protected long creationTime;
-  protected String dataRegionId;
+  protected int dataRegionId = -1;
   protected PipeTaskMeta pipeTaskMeta;
 
   protected boolean shouldExtractInsertion;
@@ -214,7 +214,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     shouldExtractDeletion = insertionDeletionListeningOptionPair.getRight();
 
     pipeName = environment.getPipeName();
-    dataRegionId = String.valueOf(environment.getRegionId());
+    dataRegionId = environment.getRegionId();
     pipeTaskMeta = environment.getPipeTaskMeta();
 
     // Metrics related to TsFileEpoch are managed in PipeExtractorMetrics. 
These metrics are
@@ -317,7 +317,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
 
   @Override
   public void close() throws Exception {
-    if (Objects.nonNull(dataRegionId)) {
+    if (dataRegionId >= 0) {
       
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, 
this);
       PipeTimePartitionListener.getInstance().stopListen(dataRegionId, this);
     }
@@ -555,7 +555,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     return skipIfNoPrivileges;
   }
 
-  public final String getDataRegionId() {
+  public final int getDataRegionId() {
     return dataRegionId;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 8ea973fbf0a..0f4193af7a9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -58,32 +58,32 @@ public class PipeDataRegionAssigner implements Closeable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeDataRegionAssigner.class);
 
   /**
-   * The {@link PipeDataRegionMatcher} is used to match the event with the 
extractor based on the
+   * The {@link PipeDataRegionMatcher} is used to match the event with the 
source based on the
    * pattern.
    */
   private final PipeDataRegionMatcher matcher;
 
-  /** The {@link DisruptorQueue} is used to assign the event to the extractor. 
*/
+  /** The {@link DisruptorQueue} is used to assign the event to the source. */
   private final DisruptorQueue disruptor;
 
-  private final String dataRegionId;
+  private final int dataRegionId;
 
   private Boolean isTableModel;
 
   private final PipeEventCounter eventCounter = new 
PipeDataRegionEventCounter();
 
-  public String getDataRegionId() {
+  public int getDataRegionId() {
     return dataRegionId;
   }
 
-  public PipeDataRegionAssigner(final String dataRegionId) {
+  public PipeDataRegionAssigner(final int dataRegionId) {
     this.matcher = new CachedSchemaPatternMatcher();
-    this.disruptor = new DisruptorQueue(this::assignToExtractor, 
this::onAssignedHook);
+    this.disruptor = new DisruptorQueue(this::assignToSource, 
this::onAssignedHook);
     this.dataRegionId = dataRegionId;
     PipeAssignerMetrics.getInstance().register(this);
 
     final DataRegion dataRegion =
-        StorageEngine.getInstance().getDataRegion(new 
DataRegionId(Integer.parseInt(dataRegionId)));
+        StorageEngine.getInstance().getDataRegion(new 
DataRegionId(dataRegionId));
     if (Objects.nonNull(dataRegion)) {
       final String databaseName = dataRegion.getDatabaseName();
       if (Objects.nonNull(databaseName)) {
@@ -128,7 +128,7 @@ public class PipeDataRegionAssigner implements Closeable {
     eventCounter.decreaseEventCount(innerEvent);
   }
 
-  private void assignToExtractor(
+  private void assignToSource(
       final PipeRealtimeEvent event, final long sequence, final boolean 
endOfBatch) {
     if (disruptor.isClosed()) {
       return;
@@ -140,17 +140,15 @@ public class PipeDataRegionAssigner implements Closeable {
     matchedAndUnmatched
         .getLeft()
         .forEach(
-            extractor -> {
+            source -> {
               if (disruptor.isClosed()) {
                 return;
               }
 
-              if (event.getEvent().isGeneratedByPipe() && 
!extractor.isForwardingPipeRequests()) {
+              if (event.getEvent().isGeneratedByPipe() && 
!source.isForwardingPipeRequests()) {
                 final ProgressReportEvent reportEvent =
                     new ProgressReportEvent(
-                        extractor.getPipeName(),
-                        extractor.getCreationTime(),
-                        extractor.getPipeTaskMeta());
+                        source.getPipeName(), source.getCreationTime(), 
source.getPipeTaskMeta());
                 reportEvent.bindProgressIndex(event.getProgressIndex());
                 if 
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
                   LOGGER.warn(
@@ -158,33 +156,33 @@ public class PipeDataRegionAssigner implements Closeable {
                       reportEvent);
                   return;
                 }
-                
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
+                
source.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
                 return;
               }
 
               final PipeRealtimeEvent copiedEvent =
                   event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-                      extractor.getPipeName(),
-                      extractor.getCreationTime(),
-                      extractor.getPipeTaskMeta(),
-                      extractor.getTreePattern(),
-                      extractor.getTablePattern(),
-                      String.valueOf(extractor.getUserId()),
-                      extractor.getUserName(),
-                      extractor.getCliHostname(),
-                      extractor.isSkipIfNoPrivileges(),
-                      extractor.getRealtimeDataExtractionStartTime(),
-                      extractor.getRealtimeDataExtractionEndTime());
+                      source.getPipeName(),
+                      source.getCreationTime(),
+                      source.getPipeTaskMeta(),
+                      source.getTreePattern(),
+                      source.getTablePattern(),
+                      String.valueOf(source.getUserId()),
+                      source.getUserName(),
+                      source.getCliHostname(),
+                      source.isSkipIfNoPrivileges(),
+                      source.getRealtimeDataExtractionStartTime(),
+                      source.getRealtimeDataExtractionEndTime());
               final EnrichedEvent innerEvent = copiedEvent.getEvent();
               // if using IoTV2, assign a replicateIndex for this realtime 
event
               if (DataRegionConsensusImpl.getInstance() instanceof 
PipeConsensus
                   && PipeConsensusProcessor.isShouldReplicate(innerEvent)) {
                 innerEvent.setReplicateIndexForIoTV2(
                     
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(
-                        extractor.getPipeName()));
+                        source.getPipeName()));
                 LOGGER.debug(
                     "[{}]Set {} for realtime event {}",
-                    extractor.getPipeName(),
+                    source.getPipeName(),
                     innerEvent.getReplicateIndexForIoTV2(),
                     innerEvent);
               }
@@ -192,15 +190,14 @@ public class PipeDataRegionAssigner implements Closeable {
               if (innerEvent instanceof PipeTsFileInsertionEvent) {
                 final PipeTsFileInsertionEvent tsFileInsertionEvent =
                     (PipeTsFileInsertionEvent) innerEvent;
-                tsFileInsertionEvent.disableMod4NonTransferPipes(
-                    extractor.isShouldTransferModFile());
+                
tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile());
               }
 
               if (innerEvent instanceof PipeDeleteDataNodeEvent) {
                 final PipeDeleteDataNodeEvent deleteDataNodeEvent =
                     (PipeDeleteDataNodeEvent) innerEvent;
                 final DeletionResourceManager manager =
-                    
DeletionResourceManager.getInstance(extractor.getDataRegionId());
+                    
DeletionResourceManager.getInstance(source.getDataRegionId());
                 // increase deletion resource's reference and bind real 
deleteEvent
                 if (Objects.nonNull(manager)
                     && DeletionResource.isDeleteNodeGeneratedInLocalByIoTV2(
@@ -217,13 +214,13 @@ public class PipeDataRegionAssigner implements Closeable {
                     copiedEvent);
                 return;
               }
-              extractor.extract(copiedEvent);
+              source.extract(copiedEvent);
             });
 
     matchedAndUnmatched
         .getRight()
         .forEach(
-            extractor -> {
+            source -> {
               if (disruptor.isClosed()) {
                 return;
               }
@@ -233,9 +230,7 @@ public class PipeDataRegionAssigner implements Closeable {
                   || innerEvent instanceof TsFileInsertionEvent) {
                 final ProgressReportEvent reportEvent =
                     new ProgressReportEvent(
-                        extractor.getPipeName(),
-                        extractor.getCreationTime(),
-                        extractor.getPipeTaskMeta());
+                        source.getPipeName(), source.getCreationTime(), 
source.getPipeTaskMeta());
                 reportEvent.bindProgressIndex(event.getProgressIndex());
                 if 
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
                   LOGGER.warn(
@@ -243,17 +238,17 @@ public class PipeDataRegionAssigner implements Closeable {
                       reportEvent);
                   return;
                 }
-                
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
+                
source.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
               }
             });
   }
 
-  public void startAssignTo(final PipeRealtimeDataRegionSource extractor) {
-    matcher.register(extractor);
+  public void startAssignTo(final PipeRealtimeDataRegionSource source) {
+    matcher.register(source);
   }
 
-  public void stopAssignTo(final PipeRealtimeDataRegionSource extractor) {
-    matcher.deregister(extractor);
+  public void stopAssignTo(final PipeRealtimeDataRegionSource source) {
+    matcher.deregister(source);
   }
 
   public void invalidateCache() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
index ff7d90c377d..bf15dcdc547 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
@@ -32,11 +32,11 @@ import java.util.concurrent.ConcurrentHashMap;
 public class PipeTsFileEpochProgressIndexKeeper {
 
   // data region id -> pipeName -> tsFile path -> max progress index
-  private final Map<String, Map<String, Map<String, TsFileResource>>> 
progressIndexKeeper =
+  private final Map<Integer, Map<String, Map<String, TsFileResource>>> 
progressIndexKeeper =
       new ConcurrentHashMap<>();
 
   public synchronized void registerProgressIndex(
-      final String dataRegionId, final String pipeName, final TsFileResource 
resource) {
+      final int dataRegionId, final String pipeName, final TsFileResource 
resource) {
     progressIndexKeeper
         .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
         .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
@@ -44,7 +44,7 @@ public class PipeTsFileEpochProgressIndexKeeper {
   }
 
   public synchronized void eliminateProgressIndex(
-      final String dataRegionId, final @Nonnull String pipeName, final String 
filePath) {
+      final int dataRegionId, final @Nonnull String pipeName, final String 
filePath) {
     progressIndexKeeper
         .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
         .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
@@ -52,7 +52,7 @@ public class PipeTsFileEpochProgressIndexKeeper {
   }
 
   public synchronized boolean isProgressIndexAfterOrEquals(
-      final String dataRegionId,
+      final int dataRegionId,
       final String pipeName,
       final String tsFilePath,
       final ProgressIndex progressIndex) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 8c49dd0a3dd..3cce521a51e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * will filter events and assign them to different 
PipeRealtimeEventDataRegionExtractors.
  */
 public class PipeInsertionDataNodeListener {
-  private final ConcurrentMap<String, PipeDataRegionAssigner> 
dataRegionId2Assigner =
+  private final ConcurrentMap<Integer, PipeDataRegionAssigner> 
dataRegionId2Assigner =
       new ConcurrentHashMap<>();
 
   private final AtomicInteger listenToTsFileExtractorCount = new 
AtomicInteger(0);
@@ -57,7 +57,7 @@ public class PipeInsertionDataNodeListener {
   //////////////////////////// start & stop ////////////////////////////
 
   public synchronized void startListenAndAssign(
-      final String dataRegionId, final PipeRealtimeDataRegionSource extractor) 
{
+      final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
     dataRegionId2Assigner
         .computeIfAbsent(dataRegionId, o -> new 
PipeDataRegionAssigner(dataRegionId))
         .startAssignTo(extractor);
@@ -71,7 +71,7 @@ public class PipeInsertionDataNodeListener {
   }
 
   public synchronized void stopListenAndAssign(
-      final String dataRegionId, final PipeRealtimeDataRegionSource extractor) 
{
+      final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
     if (assigner == null) {
       return;
@@ -97,7 +97,7 @@ public class PipeInsertionDataNodeListener {
   //////////////////////////// listen to events ////////////////////////////
 
   public void listenToTsFile(
-      final String dataRegionId,
+      final int dataRegionId,
       final String databaseName,
       final TsFileResource tsFileResource,
       final boolean isLoaded) {
@@ -118,7 +118,7 @@ public class PipeInsertionDataNodeListener {
   }
 
   public void listenToInsertNode(
-      final String dataRegionId,
+      final int dataRegionId,
       final String databaseName,
       final InsertNode insertNode,
       final TsFileResource tsFileResource) {
@@ -139,7 +139,7 @@ public class PipeInsertionDataNodeListener {
   }
 
   public DeletionResource listenToDeleteData(
-      final String regionId, final AbstractDeleteDataNode node) {
+      final int regionId, final AbstractDeleteDataNode node) {
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(regionId);
     // only events from registered data region will be extracted
     if (assigner == null
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
index c5e98cd1b2c..7c674a3bd1d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
@@ -30,19 +30,18 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class PipeTimePartitionListener {
 
-  private final Map<String, Map<String, PipeRealtimeDataRegionSource>> 
dataRegionId2Extractors =
+  private final Map<Integer, Map<String, PipeRealtimeDataRegionSource>> 
dataRegionId2Sources =
       new ConcurrentHashMap<>();
 
   // This variable is used to record the upper and lower bounds that each data 
region's time
   // partition ID has ever reached.
-  private final Map<String, Pair<Long, Long>> 
dataRegionId2TimePartitionIdBound =
+  private final Map<Integer, Pair<Long, Long>> 
dataRegionId2TimePartitionIdBound =
       new ConcurrentHashMap<>();
 
   //////////////////////////// start & stop ////////////////////////////
 
-  public synchronized void startListen(
-      String dataRegionId, PipeRealtimeDataRegionSource extractor) {
-    dataRegionId2Extractors
+  public synchronized void startListen(int dataRegionId, 
PipeRealtimeDataRegionSource extractor) {
+    dataRegionId2Sources
         .computeIfAbsent(dataRegionId, o -> new HashMap<>())
         .put(extractor.getTaskID(), extractor);
     // Assign the previously recorded upper and lower bounds of time partition 
to the extractor that
@@ -53,22 +52,21 @@ public class PipeTimePartitionListener {
     }
   }
 
-  public synchronized void stopListen(String dataRegionId, 
PipeRealtimeDataRegionSource extractor) {
-    Map<String, PipeRealtimeDataRegionSource> extractors =
-        dataRegionId2Extractors.get(dataRegionId);
+  public synchronized void stopListen(int dataRegionId, 
PipeRealtimeDataRegionSource extractor) {
+    Map<String, PipeRealtimeDataRegionSource> extractors = 
dataRegionId2Sources.get(dataRegionId);
     if (Objects.isNull(extractors)) {
       return;
     }
     extractors.remove(extractor.getTaskID());
     if (extractors.isEmpty()) {
-      dataRegionId2Extractors.remove(dataRegionId);
+      dataRegionId2Sources.remove(dataRegionId);
     }
   }
 
   //////////////////////////// listen to changes ////////////////////////////
 
   public synchronized void listenToTimePartitionGrow(
-      String dataRegionId, Pair<Long, Long> newTimePartitionIdBound) {
+      int dataRegionId, Pair<Long, Long> newTimePartitionIdBound) {
     boolean shouldBroadcastTimePartitionChange = false;
     Pair<Long, Long> oldTimePartitionIdBound = 
dataRegionId2TimePartitionIdBound.get(dataRegionId);
 
@@ -86,8 +84,7 @@ public class PipeTimePartitionListener {
     }
 
     if (shouldBroadcastTimePartitionChange) {
-      Map<String, PipeRealtimeDataRegionSource> extractors =
-          dataRegionId2Extractors.get(dataRegionId);
+      Map<String, PipeRealtimeDataRegionSource> extractors = 
dataRegionId2Sources.get(dataRegionId);
       if (Objects.isNull(extractors)) {
         return;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index ef4a4910e1b..20c78fd0b8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2019,7 +2019,7 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   public void deleteDALFolderAndClose() {
-    
Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionIdString))
+    
Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId.getId()))
         .ifPresent(
             manager -> {
               manager.close();
@@ -2833,7 +2833,8 @@ public class DataRegion implements IDataRegionForQuery {
       deleteDataInUnsealedFiles(unsealedTsFileResource, deletion, 
sealedTsFileResource);
       // capture deleteDataNode and wait it to be persisted to DAL.
       DeletionResource deletionResource =
-          
PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString,
 node);
+          PipeInsertionDataNodeListener.getInstance()
+              .listenToDeleteData(dataRegionId.getId(), node);
       // just get result. We have already waited for result in 
`listenToDeleteData`
       if (deletionResource != null && deletionResource.waitForResult() == 
Status.FAILURE) {
         throw deletionResource.getCause();
@@ -2936,7 +2937,8 @@ public class DataRegion implements IDataRegionForQuery {
 
       // capture deleteDataNode and wait it to be persisted to DAL.
       DeletionResource deletionResource =
-          
PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString,
 node);
+          PipeInsertionDataNodeListener.getInstance()
+              .listenToDeleteData(dataRegionId.getId(), node);
       // just get result. We have already waited for result in 
`listenToDeleteData`
       if (deletionResource != null && deletionResource.waitForResult() == 
Status.FAILURE) {
         throw deletionResource.getCause();
@@ -2993,7 +2995,8 @@ public class DataRegion implements IDataRegionForQuery {
       deleteDataDirectlyInFile(unsealedTsFileResource, deletion);
       // capture deleteDataNode and wait it to be persisted to DAL.
       DeletionResource deletionResource =
-          
PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString,
 node);
+          PipeInsertionDataNodeListener.getInstance()
+              .listenToDeleteData(dataRegionId.getId(), node);
       // just get result. We have already waited for result in 
`listenToDeleteData`
       if (deletionResource != null && deletionResource.waitForResult() == 
Status.FAILURE) {
         throw deletionResource.getCause();
@@ -4164,7 +4167,7 @@ public class DataRegion implements IDataRegionForQuery {
 
     // Listen before the tsFile is added into tsFile manager to avoid it being 
compacted
     PipeInsertionDataNodeListener.getInstance()
-        .listenToTsFile(dataRegionIdString, databaseName, tsFileResource, 
true);
+        .listenToTsFile(dataRegionId.getId(), databaseName, tsFileResource, 
true);
 
     tsFileManager.add(tsFileResource, false);
 
@@ -4343,6 +4346,10 @@ public class DataRegion implements IDataRegionForQuery {
     return dataRegionIdString;
   }
 
+  public int getDataRegionId() {
+    return dataRegionId.getId();
+  }
+
   /**
    * Get the storageGroupPath with dataRegionId.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 78b33c02c0b..518df5322a9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -342,7 +342,7 @@ public class TsFileProcessor {
     }
     PipeInsertionDataNodeListener.getInstance()
         .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionIdString(),
+            dataRegionInfo.getDataRegion().getDataRegionId(),
             dataRegionInfo.getDataRegion().getDatabaseName(),
             insertRowNode,
             tsFileResource);
@@ -439,7 +439,7 @@ public class TsFileProcessor {
     }
     PipeInsertionDataNodeListener.getInstance()
         .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionIdString(),
+            dataRegionInfo.getDataRegion().getDataRegionId(),
             dataRegionInfo.getDataRegion().getDatabaseName(),
             insertRowsNode,
             tsFileResource);
@@ -612,7 +612,7 @@ public class TsFileProcessor {
     }
     PipeInsertionDataNodeListener.getInstance()
         .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionIdString(),
+            dataRegionInfo.getDataRegion().getDataRegionId(),
             dataRegionInfo.getDataRegion().getDatabaseName(),
             insertTabletNode,
             tsFileResource);
@@ -1752,7 +1752,7 @@ public class TsFileProcessor {
     // before resource serialization to avoid missing hardlink after restart
     PipeInsertionDataNodeListener.getInstance()
         .listenToTsFile(
-            dataRegionInfo.getDataRegion().getDataRegionIdString(),
+            dataRegionInfo.getDataRegion().getDataRegionId(),
             dataRegionInfo.getDataRegion().getDatabaseName(),
             tsFileResource,
             false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
index 2e1161977f8..5ded82a36ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
@@ -67,7 +67,7 @@ public class TimePartitionManager {
       // PipeInsertionDataNodeListener.listenToInsertNode.
       PipeTimePartitionListener.getInstance()
           .listenToTimePartitionGrow(
-              String.valueOf(timePartitionInfo.dataRegionId.getId()),
+              timePartitionInfo.dataRegionId.getId(),
               new Pair<>(
                   timePartitionInfoMapForRegion.firstKey(),
                   timePartitionInfoMapForRegion.lastKey()));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
index 06f823c0e23..b69f2d85ef7 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
@@ -42,7 +42,7 @@ import java.io.File;
 import java.util.Collections;
 
 public class DeletionRecoverTest {
-  private static final String[] FAKE_DATA_REGION_IDS = {"2", "3"};
+  private static final int[] FAKE_DATA_REGION_IDS = {2, 3};
   private static final int THIS_DATANODE_ID =
       IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
   private static final String DELETION_BASE_DIR =
@@ -50,7 +50,7 @@ public class DeletionRecoverTest {
   private final int deletionCount = 10;
   private DeletionResourceManager deletionResourceManager;
 
-  public void setUp(boolean isRelational, String FAKE_DATA_REGION_ID) throws 
Exception {
+  public void setUp(boolean isRelational, int FAKE_DATA_REGION_ID) throws 
Exception {
     File baseDir = new File(DELETION_BASE_DIR + File.separator + 
FAKE_DATA_REGION_ID);
     if (baseDir.exists()) {
       FileUtils.deleteFileOrDirectory(baseDir);
@@ -84,7 +84,7 @@ public class DeletionRecoverTest {
 
   @After
   public void tearDown() throws Exception {
-    for (String FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
+    for (int FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
       File baseDir = new File(DELETION_BASE_DIR + File.separator + 
FAKE_DATA_REGION_ID);
       if (baseDir.exists()) {
         FileUtils.deleteFileOrDirectory(baseDir);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
index f94d909f94b..8b4ace5ec22 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
@@ -66,7 +66,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class DeletionResourceTest {
-  private static final String[] FAKE_DATA_REGION_IDS = {"2", "3", "4", "5", 
"6"};
+  private static final int[] FAKE_DATA_REGION_IDS = {2, 3, 4, 5, 6};
   private static final String DELETION_BASE_DIR =
       
IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2DeletionFileDir();
   private static final int THIS_DATANODE_ID = 0;
@@ -84,7 +84,7 @@ public class DeletionResourceTest {
   @After
   public void tearDown() throws Exception {
     
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(previousDataNodeId);
-    for (String FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
+    for (int FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
       File baseDir = new File(DELETION_BASE_DIR + File.separator + 
FAKE_DATA_REGION_ID);
       if (baseDir.exists()) {
         FileUtils.deleteFileOrDirectory(baseDir);
@@ -212,7 +212,7 @@ public class DeletionResourceTest {
           new PipeDeleteDataNodeEvent(
               deleteDataNode, "Test", 10, null, null, null, null, null, null, 
true, true);
       deletionEvent.setCommitterKeyAndCommitId(
-          new CommitterKey("Test", 10, 
Integer.parseInt(FAKE_DATA_REGION_IDS[3]), 0), i + 1);
+          new CommitterKey("Test", 10, FAKE_DATA_REGION_IDS[3], 0), i + 1);
       deletionEvents.add(deletionEvent);
 
       final DeletionResource deletionResource =
@@ -227,8 +227,7 @@ public class DeletionResourceTest {
 
     // for event commit to invoke onCommit() to removeDAL
     if (initialIndex == 0) {
-      PipeEventCommitManager.getInstance()
-          .register("Test", 10, Integer.parseInt(FAKE_DATA_REGION_IDS[3]), 
"Test");
+      PipeEventCommitManager.getInstance().register("Test", 10, 
FAKE_DATA_REGION_IDS[3], "Test");
     }
     deletionEvents.forEach(deletionEvent -> 
deletionEvent.increaseReferenceCount("test"));
     final List<Path> paths =
@@ -265,8 +264,7 @@ public class DeletionResourceTest {
             });
     final PipeTaskRuntimeConfiguration configuration =
         new PipeTaskRuntimeConfiguration(
-            new PipeTaskSourceRuntimeEnvironment(
-                "1", 1, Integer.parseInt(FAKE_DATA_REGION_IDS[4]), null));
+            new PipeTaskSourceRuntimeEnvironment("1", 1, 
FAKE_DATA_REGION_IDS[4], null));
     extractor.customize(parameters, configuration);
     Assert.assertTrue(extractor.shouldExtractDeletion());
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
index 59aec4b1f7b..9e07e91e983 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
@@ -70,8 +70,8 @@ public class PipeRealtimeExtractTest {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeRealtimeExtractTest.class);
 
-  private final String dataRegion1 = "1";
-  private final String dataRegion2 = "2";
+  private final int dataRegion1 = 1;
+  private final int dataRegion2 = 2;
   private final String pattern1 = "root.sg.d";
   private final String pattern2 = "root.sg.d.a";
   private final String[] device = new String[] {"root", "sg", "d"};
@@ -151,31 +151,19 @@ public class PipeRealtimeExtractTest {
       final PipeTaskRuntimeConfiguration configuration0 =
           new PipeTaskRuntimeConfiguration(
               new PipeTaskSourceRuntimeEnvironment(
-                  "1",
-                  1,
-                  Integer.parseInt(dataRegion1),
-                  new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+                  "1", 1, dataRegion1, new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
       final PipeTaskRuntimeConfiguration configuration1 =
           new PipeTaskRuntimeConfiguration(
               new PipeTaskSourceRuntimeEnvironment(
-                  "1",
-                  1,
-                  Integer.parseInt(dataRegion1),
-                  new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+                  "1", 1, dataRegion1, new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
       final PipeTaskRuntimeConfiguration configuration2 =
           new PipeTaskRuntimeConfiguration(
               new PipeTaskSourceRuntimeEnvironment(
-                  "1",
-                  1,
-                  Integer.parseInt(dataRegion2),
-                  new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+                  "1", 1, dataRegion2, new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
       final PipeTaskRuntimeConfiguration configuration3 =
           new PipeTaskRuntimeConfiguration(
               new PipeTaskSourceRuntimeEnvironment(
-                  "1",
-                  1,
-                  Integer.parseInt(dataRegion2),
-                  new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+                  "1", 1, dataRegion2, new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
 
       // Some parameters of extractor are validated and initialized during the 
validation process.
       extractor0.validate(new PipeParameterValidator(parameters0));
@@ -274,7 +262,7 @@ public class PipeRealtimeExtractTest {
   }
 
   private Future<?> write2DataRegion(
-      final int writeNum, final String dataRegionId, final int startNum) {
+      final int writeNum, final int dataRegionId, final int startNum) {
     final File dataRegionDir =
         new File(tsFileDir.getPath() + File.separator + dataRegionId + 
File.separator + "0");
     final boolean ignored = dataRegionDir.mkdirs();
@@ -305,7 +293,7 @@ public class PipeRealtimeExtractTest {
             PipeInsertionDataNodeListener.getInstance()
                 .listenToInsertNode(
                     dataRegionId,
-                    dataRegionId,
+                    Integer.toString(dataRegionId),
                     new InsertRowNode(
                         new PlanNodeId(String.valueOf(i)),
                         new PartialPath(device),
@@ -319,7 +307,7 @@ public class PipeRealtimeExtractTest {
             PipeInsertionDataNodeListener.getInstance()
                 .listenToInsertNode(
                     dataRegionId,
-                    dataRegionId,
+                    Integer.toString(dataRegionId),
                     new InsertRowNode(
                         new PlanNodeId(String.valueOf(i)),
                         new PartialPath(device),
@@ -331,7 +319,7 @@ public class PipeRealtimeExtractTest {
                         false),
                     resource);
             PipeInsertionDataNodeListener.getInstance()
-                .listenToTsFile(dataRegionId, dataRegionId, resource, false);
+                .listenToTsFile(dataRegionId, Integer.toString(dataRegionId), 
resource, false);
           }
         });
   }

Reply via email to