This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e9dfaa620f0 Pipe: retry historical resources with stable replicate 
index (#17655)
e9dfaa620f0 is described below

commit e9dfaa620f017c9200cd8b1c950227372ecd71e4
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu May 14 10:47:44 2026 +0800

    Pipe: retry historical resources with stable replicate index (#17655)
---
 ...istoricalDataRegionTsFileAndDeletionSource.java | 149 +++++++++++++--------
 ...ricalDataRegionTsFileAndDeletionSourceTest.java |  72 ++++++++++
 2 files changed, 165 insertions(+), 56 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 6da04faeb3a..558b38f03a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvi
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils;
 import org.apache.iotdb.commons.utils.PathUtils;
@@ -164,6 +165,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
   private Queue<PersistentResource> pendingQueue;
   private final Map<TsFileResource, Set<String>> 
filteredTsFileResources2TableNames =
       new HashMap<>();
+  private final Map<PersistentResource, Long> 
pendingResource2ReplicateIndexForIoTV2 =
+      new HashMap<>();
 
   @Override
   public void validate(final PipeParameterValidator validator) {
@@ -810,18 +813,27 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
       return null;
     }
 
-    final PersistentResource resource = pendingQueue.poll();
+    final PersistentResource resource = pendingQueue.peek();
     if (resource == null) {
       return supplyTerminateEvent();
     }
 
     if (resource instanceof TsFileResource) {
       final TsFileResource tsFileResource = (TsFileResource) resource;
-      return consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)
-          ? supplyProgressReportEvent(tsFileResource.getMaxProgressIndex())
-          : supplyTsFileEvent(tsFileResource);
+      if (consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)) {
+        clearReplicateIndexForResource(tsFileResource);
+        pendingQueue.poll();
+        return supplyProgressReportEvent(tsFileResource.getMaxProgressIndex());
+      }
+
+      final Event event = supplyTsFileEvent(tsFileResource);
+      pendingQueue.poll();
+      return event;
     }
-    return supplyDeletionEvent((DeletionResource) resource);
+
+    final Event event = supplyDeletionEvent((DeletionResource) resource);
+    pendingQueue.poll();
+    return event;
   }
 
   private Event supplyTerminateEvent() {
@@ -891,50 +903,52 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
 
   protected Event supplyTsFileEvent(final TsFileResource resource) {
     if (!filteredTsFileResources2TableNames.containsKey(resource)) {
+      clearReplicateIndexForResource(resource);
       return supplyProgressReportEvent(resource.getMaxProgressIndex());
     }
 
-    final PipeTsFileInsertionEvent event =
-        new PipeTsFileInsertionEvent(
-            isModelDetected ? isTableModel : null,
-            resource.getDatabaseName(),
-            resource,
-            null,
-            shouldTransferModFile,
-            false,
-            true,
-            filteredTsFileResources2TableNames.get(resource),
+    boolean shouldUnpinResource = false;
+    boolean shouldClearReplicateIndex = false;
+    try {
+      final PipeTsFileInsertionEvent event =
+          new PipeTsFileInsertionEvent(
+              isModelDetected ? isTableModel : null,
+              resource.getDatabaseName(),
+              resource,
+              null,
+              shouldTransferModFile,
+              false,
+              true,
+              filteredTsFileResources2TableNames.get(resource),
+              pipeName,
+              creationTime,
+              pipeTaskMeta,
+              treePattern,
+              tablePattern,
+              userId,
+              userName,
+              cliHostname,
+              skipIfNoPrivileges,
+              historicalDataExtractionStartTime,
+              historicalDataExtractionEndTime);
+
+      // if using IoTV2, assign a replicateIndex for this event
+      if (shouldAssignReplicateIndexForIoTV2(event)) {
+        
event.setReplicateIndexForIoTV2(assignReplicateIndexForResource(resource));
+        LOGGER.debug(
+            "[{}]Set {} for historical event {}",
             pipeName,
-            creationTime,
-            pipeTaskMeta,
-            treePattern,
-            tablePattern,
-            userId,
-            userName,
-            cliHostname,
-            skipIfNoPrivileges,
-            historicalDataExtractionStartTime,
-            historicalDataExtractionEndTime);
-
-    filteredTsFileResources2TableNames.remove(resource);
-
-    // if using IoTV2, assign a replicateIndex for this event
-    if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
-        && IoTConsensusV2Processor.isShouldReplicate(event)) {
-      event.setReplicateIndexForIoTV2(
-          
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName));
-      LOGGER.debug(
-          "[{}]Set {} for historical event {}", pipeName, 
event.getReplicateIndexForIoTV2(), event);
-    }
+            event.getReplicateIndexForIoTV2(),
+            event);
+      }
 
-    if (sloppyPattern || isDbNameCoveredByPattern) {
-      event.skipParsingPattern();
-    }
-    if (sloppyTimeRange || isTsFileResourceCoveredByTimeRange(resource)) {
-      event.skipParsingTime();
-    }
+      if (sloppyPattern || isDbNameCoveredByPattern) {
+        event.skipParsingPattern();
+      }
+      if (sloppyTimeRange || isTsFileResourceCoveredByTimeRange(resource)) {
+        event.skipParsingTime();
+      }
 
-    try {
       final boolean isReferenceCountIncreased =
           event.increaseReferenceCount(
               PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName());
@@ -945,17 +959,25 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
             dataRegionId,
             event);
       }
+      filteredTsFileResources2TableNames.remove(resource);
+      shouldUnpinResource = true;
+      shouldClearReplicateIndex = true;
       return isReferenceCountIncreased ? event : null;
     } finally {
-      try {
-        PipeDataNodeResourceManager.tsfile()
-            .unpinTsFileResource(resource, shouldTransferModFile, pipeName);
-      } catch (final IOException e) {
-        LOGGER.warn(
-            "Pipe {}@{}: failed to unpin TsFileResource after creating event, 
original path: {}",
-            pipeName,
-            dataRegionId,
-            resource.getTsFilePath());
+      if (shouldClearReplicateIndex) {
+        clearReplicateIndexForResource(resource);
+      }
+      if (shouldUnpinResource) {
+        try {
+          PipeDataNodeResourceManager.tsfile()
+              .unpinTsFileResource(resource, shouldTransferModFile, pipeName);
+        } catch (final IOException e) {
+          LOGGER.warn(
+              "Pipe {}@{}: failed to unpin TsFileResource after creating 
event, original path: {}",
+              pipeName,
+              dataRegionId,
+              resource.getTsFilePath());
+        }
       }
     }
   }
@@ -983,10 +1005,8 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
             false);
 
     // if using IoTV2, assign a replicateIndex for this historical deletion 
event
-    if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
-        && IoTConsensusV2Processor.isShouldReplicate(event)) {
-      event.setReplicateIndexForIoTV2(
-          
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName));
+    if (shouldAssignReplicateIndexForIoTV2(event)) {
+      
event.setReplicateIndexForIoTV2(assignReplicateIndexForResource(deletionResource));
       LOGGER.debug(
           "[{}]Set {} for historical deletion event {}",
           pipeName,
@@ -1017,9 +1037,25 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
                   event.setDeletionResource(
                       manager.getDeletionResource(event.getDeleteDataNode())));
     }
+    clearReplicateIndexForResource(deletionResource);
     return isReferenceCountIncreased ? event : null;
   }
 
+  protected boolean shouldAssignReplicateIndexForIoTV2(final EnrichedEvent 
event) {
+    return DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
+        && IoTConsensusV2Processor.isShouldReplicate(event);
+  }
+
+  protected long assignReplicateIndexForResource(final PersistentResource 
resource) {
+    return pendingResource2ReplicateIndexForIoTV2.computeIfAbsent(
+        resource,
+        ignored -> 
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName));
+  }
+
+  protected void clearReplicateIndexForResource(final PersistentResource 
resource) {
+    pendingResource2ReplicateIndexForIoTV2.remove(resource);
+  }
+
   @Override
   public synchronized boolean hasConsumedAll() {
     // If the pendingQueue is null when the function is called, it implies 
that the extractor only
@@ -1055,5 +1091,6 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
       pendingQueue.clear();
       pendingQueue = null;
     }
+    pendingResource2ReplicateIndexForIoTV2.clear();
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
index 93580e639eb..b54efc07f79 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvi
 import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -135,6 +136,65 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
     }
   }
 
+  @Test
+  public void testSupplyRetriesSameTsFileAfterEventCreationFailure() throws 
Exception {
+    final TestablePipeHistoricalDataRegionTsFileAndDeletionSource source =
+        new TestablePipeHistoricalDataRegionTsFileAndDeletionSource();
+    final Event expectedEvent = new Event() {};
+    final RuntimeException expectedException = new RuntimeException("mock 
supply failure");
+    final File tempDir = 
Files.createTempDirectory("pipeHistoricalRetry").toFile();
+
+    try {
+      final TsFileResource firstResource = createTsFileResource(tempDir, 
"first.tsfile");
+      final TsFileResource secondResource = createTsFileResource(tempDir, 
"second.tsfile");
+
+      source.setSuppliedEvent(expectedEvent);
+      source.setFailureBeforeSuccess(expectedException, 1);
+      setPrivateField(source, "hasBeenStarted", true);
+      setPrivateField(
+          source,
+          "pendingQueue",
+          new ArrayDeque<PersistentResource>(Arrays.asList(firstResource, 
secondResource)));
+
+      final RuntimeException actualException =
+          Assert.assertThrows(RuntimeException.class, source::supply);
+      Assert.assertSame(expectedException, actualException);
+      Assert.assertEquals(
+          Arrays.asList(firstResource.getTsFilePath()), 
source.getSuppliedTsFiles());
+      Assert.assertEquals(2, source.getPendingQueueSize());
+
+      Assert.assertSame(expectedEvent, source.supply());
+      Assert.assertEquals(
+          Arrays.asList(firstResource.getTsFilePath(), 
firstResource.getTsFilePath()),
+          source.getSuppliedTsFiles());
+      Assert.assertEquals(1, source.getPendingQueueSize());
+    } finally {
+      FileUtils.deleteFileOrDirectory(tempDir);
+    }
+  }
+
+  @Test
+  public void testReplicateIndexShouldBeStableBeforeResourceConsumed() throws 
Exception {
+    final TestablePipeHistoricalDataRegionTsFileAndDeletionSource source =
+        new TestablePipeHistoricalDataRegionTsFileAndDeletionSource();
+    final File tempDir = 
Files.createTempDirectory("pipeHistoricalReplicateIndex").toFile();
+
+    try {
+      final TsFileResource resource = createTsFileResource(tempDir, 
"stable.tsfile");
+      final String pipeName = "consensus_pipe_retry_test_" + System.nanoTime();
+      setPrivateField(source, "pipeName", pipeName);
+      ReplicateProgressDataNodeManager.resetReplicateIndexForIoTV2(pipeName);
+
+      Assert.assertEquals(1L, 
source.assignReplicateIndexForResource(resource));
+      Assert.assertEquals(1L, 
source.assignReplicateIndexForResource(resource));
+
+      source.clearReplicateIndexForResource(resource);
+      Assert.assertEquals(2L, 
source.assignReplicateIndexForResource(resource));
+    } finally {
+      FileUtils.deleteFileOrDirectory(tempDir);
+    }
+  }
+
   private static TsFileResource createTsFileResource(final File tempDir, final 
String fileName)
       throws IOException {
     final File file = new File(tempDir, fileName);
@@ -169,6 +229,8 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
     private final List<String> consumedSkippedTsFilePaths = new ArrayList<>();
     private final List<String> suppliedTsFiles = new ArrayList<>();
     private Event suppliedEvent;
+    private RuntimeException exceptionToThrow;
+    private int remainingFailureCount;
 
     private void setSkippedTsFilePaths(final String... skippedTsFilePaths) {
       this.skippedTsFilePaths.clear();
@@ -199,6 +261,12 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
       this.suppliedEvent = suppliedEvent;
     }
 
+    private void setFailureBeforeSuccess(
+        final RuntimeException exceptionToThrow, final int 
remainingFailureCount) {
+      this.exceptionToThrow = exceptionToThrow;
+      this.remainingFailureCount = remainingFailureCount;
+    }
+
     @Override
     protected boolean consumeSkippedHistoricalTsFileEventIfNecessary(
         final TsFileResource resource) {
@@ -212,6 +280,10 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
     @Override
     protected Event supplyTsFileEvent(final TsFileResource resource) {
       suppliedTsFiles.add(resource.getTsFilePath());
+      if (remainingFailureCount > 0) {
+        remainingFailureCount--;
+        throw exceptionToThrow;
+      }
       return suppliedEvent;
     }
   }

Reply via email to