This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b8fd08984cb IoTV2: Refactor replicate index so that it is shared at 
the pipe task level & Add some log for delete local peer (#15815)
b8fd08984cb is described below

commit b8fd08984cb2e87aabbd98337c3ada32fbb9cfd1
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Jun 26 09:33:48 2025 +0800

    IoTV2: Refactor replicate index so that it is shared at the pipe task level 
& Add some log for delete local peer (#15815)
    
    * use custom replicate index for each consensus pipe
    
    * test conf
    
    * fix
    
    * Revert "test conf"
    
    This reverts commit f0f13af4f2e7edd55d561fafb6a01bf5048171dd.
    
    * fix review
---
 .../apache/iotdb/consensus/pipe/PipeConsensus.java |  3 +-
 .../pipe/consensuspipe/ConsensusPipeConnector.java |  4 +-
 .../consensuspipe/ReplicateProgressManager.java    |  2 +-
 .../pipe/metric/PipeConsensusSyncLagManager.java   | 12 +++---
 .../connector/PipeConnectorSubtaskManager.java     |  4 ++
 .../pipeconsensus/PipeConsensusAsyncConnector.java | 13 ++----
 .../ReplicateProgressDataNodeManager.java          | 21 ++++++---
 .../event/realtime/PipeRealtimeEventFactory.java   | 50 +---------------------
 ...oricalDataRegionTsFileAndDeletionExtractor.java |  8 +---
 .../realtime/assigner/PipeDataRegionAssigner.java  | 18 +++++++-
 .../listener/PipeInsertionDataNodeListener.java    | 11 ++---
 .../task/progress/PipeEventCommitManager.java      | 11 -----
 12 files changed, 55 insertions(+), 102 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index e7a10d3b9d3..56792236c23 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -375,13 +375,14 @@ public class PipeConsensus implements IConsensus {
         if (!stateMachineMap.containsKey(groupId)) {
           throw new ConsensusGroupNotExistException(groupId);
         }
-
+        LOGGER.info("[{}] start to delete local peer for group {}", 
CLASS_NAME, groupId);
         final PipeConsensusServerImpl consensus = stateMachineMap.get(groupId);
         consensus.clear();
         stateMachineMap.remove(groupId);
 
         FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId)));
         
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
+        LOGGER.info("[{}] finish deleting local peer for group {}", 
CLASS_NAME, groupId);
       } finally {
         stateMachineMapLock.readLock().unlock();
       }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
index 6f1396db972..bf4cd02b2ec 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.consensus.pipe.consensuspipe;
 
 public interface ConsensusPipeConnector {
-  long getConsensusPipeCommitProgress();
+  long getLeaderReplicateProgress();
 
-  long getConsensusPipeReplicateProgress();
+  long getFollowerApplyProgress();
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ReplicateProgressManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ReplicateProgressManager.java
index 13a18132272..9ae2964e89d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ReplicateProgressManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ReplicateProgressManager.java
@@ -32,6 +32,6 @@ public interface ReplicateProgressManager {
   long getSyncLagForSpecificConsensusPipe(
       ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName);
 
-  void pinCommitIndexForMigration(
+  void pinReplicateIndexForRegionMigration(
       ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName);
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
index 8f6cb651ab6..0539bece918 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
@@ -48,9 +48,7 @@ public class PipeConsensusSyncLagManager {
     return 
Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName))
         .map(
             consensusPipeConnector ->
-                Math.max(
-                    pinnedCommitIndex - 
consensusPipeConnector.getConsensusPipeReplicateProgress(),
-                    0L))
+                Math.max(pinnedCommitIndex - 
consensusPipeConnector.getFollowerApplyProgress(), 0L))
         .orElse(0L);
   }
 
@@ -62,16 +60,16 @@ public class PipeConsensusSyncLagManager {
     return 
Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName))
         .map(
             consensusPipeConnector -> {
-              long userWriteProgress = 
consensusPipeConnector.getConsensusPipeCommitProgress();
-              long replicateProgress = 
consensusPipeConnector.getConsensusPipeReplicateProgress();
+              long userWriteProgress = 
consensusPipeConnector.getLeaderReplicateProgress();
+              long replicateProgress = 
consensusPipeConnector.getFollowerApplyProgress();
               return Math.max(userWriteProgress - replicateProgress, 0L);
             })
         .orElse(0L);
   }
 
-  public long getCurrentCommitIndex(ConsensusPipeName consensusPipeName) {
+  public long getCurrentLeaderReplicateIndex(ConsensusPipeName 
consensusPipeName) {
     return 
Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName))
-        .map(ConsensusPipeConnector::getConsensusPipeCommitProgress)
+        .map(ConsensusPipeConnector::getLeaderReplicateProgress)
         .orElse(0L);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
index 3a2cd2639f4..dd461e7f952 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeE
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.pipe.agent.task.execution.PipeConnectorSubtaskExecutor;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
+import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -201,6 +202,9 @@ public class PipeConnectorSubtaskManager {
     }
 
     PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, 
regionId);
+    // Reset IoTV2 replicate index to prevent index jumps. Do this when a 
consensus pipe no longer
+    // replicates data, since extractor and processor are already dropped now.
+    ReplicateProgressDataNodeManager.resetReplicateIndexForIoTV2(pipeName);
   }
 
   public synchronized void start(final String attributeSortedString) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 6ed86376c5a..bf1dab239f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -28,7 +28,6 @@ import 
org.apache.iotdb.commons.client.container.IoTV2GlobalComponentContainer;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
-import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.service.metric.MetricService;
@@ -39,7 +38,6 @@ import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusDeleteEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletBatchEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletInsertNodeEventHandler;
@@ -48,6 +46,7 @@ import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusDeleteNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
 import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusConnectorMetrics;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -726,16 +725,12 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
   }
 
   @Override
-  public long getConsensusPipeCommitProgress() {
-    return PipeEventCommitManager.getInstance()
-        .getGivenConsensusPipeCommitId(
-            consensusPipeName,
-            PipeDataNodeAgent.task().getPipeCreationTime(consensusPipeName),
-            consensusGroupId);
+  public long getLeaderReplicateProgress() {
+    return 
ReplicateProgressDataNodeManager.getReplicateIndexForIoTV2(consensusPipeName);
   }
 
   @Override
-  public long getConsensusPipeReplicateProgress() {
+  public long getFollowerApplyProgress() {
     return currentReplicateProgress;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
index ca6153d804a..29347cfd5f1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
@@ -44,7 +44,8 @@ import java.util.stream.Collectors;
 
 public class ReplicateProgressDataNodeManager implements 
ReplicateProgressManager {
   private static final int DATA_NODE_ID = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
-  private static final Map<String, AtomicLong> groupId2ReplicateIndex = new 
ConcurrentHashMap<>();
+  private static final Map<String, AtomicLong> consensusPipe2ReplicateIndex =
+      new ConcurrentHashMap<>();
   private final Map<ConsensusGroupId, ProgressIndex> groupId2MaxProgressIndex;
   private final Map<ConsensusPipeName, Long> 
consensusPipe2pinnedCommitIndexForMigration;
 
@@ -55,12 +56,20 @@ public class ReplicateProgressDataNodeManager implements 
ReplicateProgressManage
     recoverMaxProgressIndexFromDataRegion();
   }
 
-  public static long assignReplicateIndexForIoTV2(String groupId) {
-    return groupId2ReplicateIndex
-        .compute(groupId, (k, v) -> v == null ? new AtomicLong(0) : v)
+  public static long assignReplicateIndexForIoTV2(String consensusPipeName) {
+    return consensusPipe2ReplicateIndex
+        .compute(consensusPipeName, (k, v) -> v == null ? new AtomicLong(0) : 
v)
         .incrementAndGet();
   }
 
+  public static void resetReplicateIndexForIoTV2(String consensusPipeName) {
+    consensusPipe2ReplicateIndex.put(consensusPipeName, new AtomicLong(0));
+  }
+
+  public static long getReplicateIndexForIoTV2(String consensusPipeName) {
+    return consensusPipe2ReplicateIndex.getOrDefault(consensusPipeName, new 
AtomicLong(0)).get();
+  }
+
   public static ProgressIndex extractLocalSimpleProgressIndex(ProgressIndex 
progressIndex) {
     if (progressIndex instanceof RecoverProgressIndex) {
       final Map<Integer, SimpleProgressIndex> dataNodeId2LocalIndex =
@@ -151,11 +160,11 @@ public class ReplicateProgressDataNodeManager implements 
ReplicateProgressManage
   }
 
   @Override
-  public void pinCommitIndexForMigration(
+  public void pinReplicateIndexForRegionMigration(
       ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName) {
     this.consensusPipe2pinnedCommitIndexForMigration.put(
         consensusPipeName,
         PipeConsensusSyncLagManager.getInstance(consensusGroupId.toString())
-            .getCurrentCommitIndex(consensusPipeName));
+            .getCurrentLeaderReplicateIndex(consensusPipeName));
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index fa8a4e00024..cde00ee38d0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -20,15 +20,11 @@
 package org.apache.iotdb.db.pipe.event.realtime;
 
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
-import org.apache.iotdb.consensus.pipe.PipeConsensus;
-import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
-import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpochManager;
-import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -36,18 +32,12 @@ import 
org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.stream.Collectors;
 
 public class PipeRealtimeEventFactory {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeRealtimeEventFactory.class);
   private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new 
TsFileEpochManager();
 
   public static PipeRealtimeEvent createRealtimeEvent(
-      final String dataRegionId,
       final Boolean isTableModel,
       final String databaseNameFromDataRegion,
       final TsFileResource resource,
@@ -56,23 +46,10 @@ public class PipeRealtimeEventFactory {
         new PipeTsFileInsertionEvent(
             isTableModel, databaseNameFromDataRegion, resource, isLoaded, 
false);
 
-    // if using IoTV2, assign a replicateIndex for this event
-    if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus
-        && PipeConsensusProcessor.isShouldReplicate(tsFileInsertionEvent)) {
-      tsFileInsertionEvent.setReplicateIndexForIoTV2(
-          
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(dataRegionId));
-      LOGGER.info(
-          "[Region{}]Set {} for event {}",
-          dataRegionId,
-          tsFileInsertionEvent.getReplicateIndexForIoTV2(),
-          tsFileInsertionEvent);
-    }
-
     return 
TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(tsFileInsertionEvent, 
resource);
   }
 
   public static PipeRealtimeEvent createRealtimeEvent(
-      final String dataRegionId,
       final Boolean isTableModel,
       final String databaseNameFromDataRegion,
       final WALEntryHandler walEntryHandler,
@@ -98,18 +75,6 @@ public class PipeRealtimeEventFactory {
             insertNode.isAligned(),
             insertNode.isGeneratedByPipe());
 
-    // if using IoTV2, assign a replicateIndex for this event
-    if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus
-        && PipeConsensusProcessor.isShouldReplicate(insertionEvent)) {
-      insertionEvent.setReplicateIndexForIoTV2(
-          
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(dataRegionId));
-      LOGGER.info(
-          "[Region{}]Set {} for event {}",
-          dataRegionId,
-          insertionEvent.getReplicateIndexForIoTV2(),
-          insertionEvent);
-    }
-
     return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
         insertionEvent, insertNode, resource);
   }
@@ -120,23 +85,10 @@ public class PipeRealtimeEventFactory {
         new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null);
   }
 
-  public static PipeRealtimeEvent createRealtimeEvent(
-      final String dataRegionId, final AbstractDeleteDataNode node) {
+  public static PipeRealtimeEvent createRealtimeEvent(final 
AbstractDeleteDataNode node) {
     PipeDeleteDataNodeEvent deleteDataNodeEvent =
         new PipeDeleteDataNodeEvent(node, node.isGeneratedByPipe());
 
-    // if using IoTV2, assign a replicateIndex for this event
-    if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus
-        && PipeConsensusProcessor.isShouldReplicate(deleteDataNodeEvent)) {
-      deleteDataNodeEvent.setReplicateIndexForIoTV2(
-          
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(dataRegionId));
-      LOGGER.info(
-          "[Region{}]Set {} for event {}",
-          dataRegionId,
-          deleteDataNodeEvent.getReplicateIndexForIoTV2(),
-          deleteDataNodeEvent);
-    }
-
     return new PipeRealtimeEvent(deleteDataNodeEvent, null, null);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index dc9b7318acc..79e20880689 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -877,13 +877,9 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
     if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus
         && PipeConsensusProcessor.isShouldReplicate(event)) {
       event.setReplicateIndexForIoTV2(
-          ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(
-              resource.getDataRegionId()));
+          
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName));
       LOGGER.info(
-          "[Region{}]Set {} for event {}",
-          resource.getDataRegionId(),
-          event.getReplicateIndexForIoTV2(),
-          event);
+          "[{}]Set {} for historical event {}", pipeName, 
event.getReplicateIndexForIoTV2(), event);
     }
 
     if (sloppyPattern || isDbNameCoveredByPattern) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index c76652aa0e2..060b395e51a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -27,6 +27,9 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
 import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.consensus.pipe.PipeConsensus;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -40,6 +43,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.CachedSche
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.PipeDataRegionMatcher;
 import org.apache.iotdb.db.pipe.metric.source.PipeAssignerMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
+import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -191,8 +195,18 @@ public class PipeDataRegionAssigner implements Closeable {
                       extractor.getRealtimeDataExtractionStartTime(),
                       extractor.getRealtimeDataExtractionEndTime());
               final EnrichedEvent innerEvent = copiedEvent.getEvent();
-              // Bind replicateIndex for IoTV2
-              
innerEvent.setReplicateIndexForIoTV2(event.getEvent().getReplicateIndexForIoTV2());
+              // if using IoTV2, assign a replicateIndex for this realtime 
event
+              if (DataRegionConsensusImpl.getInstance() instanceof 
PipeConsensus
+                  && PipeConsensusProcessor.isShouldReplicate(innerEvent)) {
+                innerEvent.setReplicateIndexForIoTV2(
+                    
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(
+                        extractor.getPipeName()));
+                LOGGER.info(
+                    "[{}]Set {} for realtime event {}",
+                    extractor.getPipeName(),
+                    innerEvent.getReplicateIndexForIoTV2(),
+                    innerEvent);
+              }
 
               if (innerEvent instanceof PipeTsFileInsertionEvent) {
                 final PipeTsFileInsertionEvent tsFileInsertionEvent =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index f1f8c22ac51..90d378ecf7d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -116,7 +116,7 @@ public class PipeInsertionDataNodeListener {
 
     assigner.publishToAssign(
         PipeRealtimeEventFactory.createRealtimeEvent(
-            dataRegionId, assigner.isTableModel(), databaseName, 
tsFileResource, isLoaded));
+            assigner.isTableModel(), databaseName, tsFileResource, isLoaded));
   }
 
   public void listenToInsertNode(
@@ -138,12 +138,7 @@ public class PipeInsertionDataNodeListener {
 
     assigner.publishToAssign(
         PipeRealtimeEventFactory.createRealtimeEvent(
-            dataRegionId,
-            assigner.isTableModel(),
-            databaseName,
-            walEntryHandler,
-            insertNode,
-            tsFileResource));
+            assigner.isTableModel(), databaseName, walEntryHandler, 
insertNode, tsFileResource));
   }
 
   public DeletionResource listenToDeleteData(
@@ -169,7 +164,7 @@ public class PipeInsertionDataNodeListener {
       deletionResource = null;
     }
 
-    
assigner.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(regionId, 
node));
+    
assigner.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(node));
 
     return deletionResource;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
index b37bd07d1d9..31ee7bfeced 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
@@ -176,17 +176,6 @@ public class PipeEventCommitManager {
     this.commitRateMarker = commitRateMarker;
   }
 
-  public long getGivenConsensusPipeCommitId(
-      final String consensusPipeName, final long creationTime, final int 
consensusGroupId) {
-    final CommitterKey committerKey =
-        generateCommitterKey(consensusPipeName, creationTime, 
consensusGroupId);
-    final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
-    if (committer == null) {
-      return 0;
-    }
-    return committer.getCurrentCommitId();
-  }
-
   //////////////////////////// singleton ////////////////////////////
 
   private PipeEventCommitManager() {

Reply via email to