This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e9dfaa620f0 Pipe: retry historical resources with stable replicate
index (#17655)
e9dfaa620f0 is described below
commit e9dfaa620f017c9200cd8b1c950227372ecd71e4
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu May 14 10:47:44 2026 +0800
Pipe: retry historical resources with stable replicate index (#17655)
---
...istoricalDataRegionTsFileAndDeletionSource.java | 149 +++++++++++++--------
...ricalDataRegionTsFileAndDeletionSourceTest.java | 72 ++++++++++
2 files changed, 165 insertions(+), 56 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 6da04faeb3a..558b38f03a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvi
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils;
import org.apache.iotdb.commons.utils.PathUtils;
@@ -164,6 +165,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
private Queue<PersistentResource> pendingQueue;
private final Map<TsFileResource, Set<String>>
filteredTsFileResources2TableNames =
new HashMap<>();
+ private final Map<PersistentResource, Long>
pendingResource2ReplicateIndexForIoTV2 =
+ new HashMap<>();
@Override
public void validate(final PipeParameterValidator validator) {
@@ -810,18 +813,27 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
return null;
}
- final PersistentResource resource = pendingQueue.poll();
+ final PersistentResource resource = pendingQueue.peek();
if (resource == null) {
return supplyTerminateEvent();
}
if (resource instanceof TsFileResource) {
final TsFileResource tsFileResource = (TsFileResource) resource;
- return consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)
- ? supplyProgressReportEvent(tsFileResource.getMaxProgressIndex())
- : supplyTsFileEvent(tsFileResource);
+ if (consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)) {
+ clearReplicateIndexForResource(tsFileResource);
+ pendingQueue.poll();
+ return supplyProgressReportEvent(tsFileResource.getMaxProgressIndex());
+ }
+
+ final Event event = supplyTsFileEvent(tsFileResource);
+ pendingQueue.poll();
+ return event;
}
- return supplyDeletionEvent((DeletionResource) resource);
+
+ final Event event = supplyDeletionEvent((DeletionResource) resource);
+ pendingQueue.poll();
+ return event;
}
private Event supplyTerminateEvent() {
@@ -891,50 +903,52 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
protected Event supplyTsFileEvent(final TsFileResource resource) {
if (!filteredTsFileResources2TableNames.containsKey(resource)) {
+ clearReplicateIndexForResource(resource);
return supplyProgressReportEvent(resource.getMaxProgressIndex());
}
- final PipeTsFileInsertionEvent event =
- new PipeTsFileInsertionEvent(
- isModelDetected ? isTableModel : null,
- resource.getDatabaseName(),
- resource,
- null,
- shouldTransferModFile,
- false,
- true,
- filteredTsFileResources2TableNames.get(resource),
+ boolean shouldUnpinResource = false;
+ boolean shouldClearReplicateIndex = false;
+ try {
+ final PipeTsFileInsertionEvent event =
+ new PipeTsFileInsertionEvent(
+ isModelDetected ? isTableModel : null,
+ resource.getDatabaseName(),
+ resource,
+ null,
+ shouldTransferModFile,
+ false,
+ true,
+ filteredTsFileResources2TableNames.get(resource),
+ pipeName,
+ creationTime,
+ pipeTaskMeta,
+ treePattern,
+ tablePattern,
+ userId,
+ userName,
+ cliHostname,
+ skipIfNoPrivileges,
+ historicalDataExtractionStartTime,
+ historicalDataExtractionEndTime);
+
+ // if using IoTV2, assign a replicateIndex for this event
+ if (shouldAssignReplicateIndexForIoTV2(event)) {
+
event.setReplicateIndexForIoTV2(assignReplicateIndexForResource(resource));
+ LOGGER.debug(
+ "[{}]Set {} for historical event {}",
pipeName,
- creationTime,
- pipeTaskMeta,
- treePattern,
- tablePattern,
- userId,
- userName,
- cliHostname,
- skipIfNoPrivileges,
- historicalDataExtractionStartTime,
- historicalDataExtractionEndTime);
-
- filteredTsFileResources2TableNames.remove(resource);
-
- // if using IoTV2, assign a replicateIndex for this event
- if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
- && IoTConsensusV2Processor.isShouldReplicate(event)) {
- event.setReplicateIndexForIoTV2(
-
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName));
- LOGGER.debug(
- "[{}]Set {} for historical event {}", pipeName,
event.getReplicateIndexForIoTV2(), event);
- }
+ event.getReplicateIndexForIoTV2(),
+ event);
+ }
- if (sloppyPattern || isDbNameCoveredByPattern) {
- event.skipParsingPattern();
- }
- if (sloppyTimeRange || isTsFileResourceCoveredByTimeRange(resource)) {
- event.skipParsingTime();
- }
+ if (sloppyPattern || isDbNameCoveredByPattern) {
+ event.skipParsingPattern();
+ }
+ if (sloppyTimeRange || isTsFileResourceCoveredByTimeRange(resource)) {
+ event.skipParsingTime();
+ }
- try {
final boolean isReferenceCountIncreased =
event.increaseReferenceCount(
PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName());
@@ -945,17 +959,25 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
dataRegionId,
event);
}
+ filteredTsFileResources2TableNames.remove(resource);
+ shouldUnpinResource = true;
+ shouldClearReplicateIndex = true;
return isReferenceCountIncreased ? event : null;
} finally {
- try {
- PipeDataNodeResourceManager.tsfile()
- .unpinTsFileResource(resource, shouldTransferModFile, pipeName);
- } catch (final IOException e) {
- LOGGER.warn(
- "Pipe {}@{}: failed to unpin TsFileResource after creating event,
original path: {}",
- pipeName,
- dataRegionId,
- resource.getTsFilePath());
+ if (shouldClearReplicateIndex) {
+ clearReplicateIndexForResource(resource);
+ }
+ if (shouldUnpinResource) {
+ try {
+ PipeDataNodeResourceManager.tsfile()
+ .unpinTsFileResource(resource, shouldTransferModFile, pipeName);
+ } catch (final IOException e) {
+ LOGGER.warn(
+ "Pipe {}@{}: failed to unpin TsFileResource after creating
event, original path: {}",
+ pipeName,
+ dataRegionId,
+ resource.getTsFilePath());
+ }
}
}
}
@@ -983,10 +1005,8 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
false);
// if using IoTV2, assign a replicateIndex for this historical deletion
event
- if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
- && IoTConsensusV2Processor.isShouldReplicate(event)) {
- event.setReplicateIndexForIoTV2(
-
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName));
+ if (shouldAssignReplicateIndexForIoTV2(event)) {
+
event.setReplicateIndexForIoTV2(assignReplicateIndexForResource(deletionResource));
LOGGER.debug(
"[{}]Set {} for historical deletion event {}",
pipeName,
@@ -1017,9 +1037,25 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
event.setDeletionResource(
manager.getDeletionResource(event.getDeleteDataNode())));
}
+ clearReplicateIndexForResource(deletionResource);
return isReferenceCountIncreased ? event : null;
}
+ protected boolean shouldAssignReplicateIndexForIoTV2(final EnrichedEvent
event) {
+ return DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
+ && IoTConsensusV2Processor.isShouldReplicate(event);
+ }
+
+ protected long assignReplicateIndexForResource(final PersistentResource
resource) {
+ return pendingResource2ReplicateIndexForIoTV2.computeIfAbsent(
+ resource,
+ ignored ->
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName));
+ }
+
+ protected void clearReplicateIndexForResource(final PersistentResource
resource) {
+ pendingResource2ReplicateIndexForIoTV2.remove(resource);
+ }
+
@Override
public synchronized boolean hasConsumedAll() {
// If the pendingQueue is null when the function is called, it implies
that the extractor only
@@ -1055,5 +1091,6 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
pendingQueue.clear();
pendingQueue = null;
}
+ pendingResource2ReplicateIndexForIoTV2.clear();
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
index 93580e639eb..b54efc07f79 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvi
import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -135,6 +136,65 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
}
}
+ @Test
+ public void testSupplyRetriesSameTsFileAfterEventCreationFailure() throws
Exception {
+ final TestablePipeHistoricalDataRegionTsFileAndDeletionSource source =
+ new TestablePipeHistoricalDataRegionTsFileAndDeletionSource();
+ final Event expectedEvent = new Event() {};
+ final RuntimeException expectedException = new RuntimeException("mock
supply failure");
+ final File tempDir =
Files.createTempDirectory("pipeHistoricalRetry").toFile();
+
+ try {
+ final TsFileResource firstResource = createTsFileResource(tempDir,
"first.tsfile");
+ final TsFileResource secondResource = createTsFileResource(tempDir,
"second.tsfile");
+
+ source.setSuppliedEvent(expectedEvent);
+ source.setFailureBeforeSuccess(expectedException, 1);
+ setPrivateField(source, "hasBeenStarted", true);
+ setPrivateField(
+ source,
+ "pendingQueue",
+ new ArrayDeque<PersistentResource>(Arrays.asList(firstResource,
secondResource)));
+
+ final RuntimeException actualException =
+ Assert.assertThrows(RuntimeException.class, source::supply);
+ Assert.assertSame(expectedException, actualException);
+ Assert.assertEquals(
+ Arrays.asList(firstResource.getTsFilePath()),
source.getSuppliedTsFiles());
+ Assert.assertEquals(2, source.getPendingQueueSize());
+
+ Assert.assertSame(expectedEvent, source.supply());
+ Assert.assertEquals(
+ Arrays.asList(firstResource.getTsFilePath(),
firstResource.getTsFilePath()),
+ source.getSuppliedTsFiles());
+ Assert.assertEquals(1, source.getPendingQueueSize());
+ } finally {
+ FileUtils.deleteFileOrDirectory(tempDir);
+ }
+ }
+
+ @Test
+ public void testReplicateIndexShouldBeStableBeforeResourceConsumed() throws
Exception {
+ final TestablePipeHistoricalDataRegionTsFileAndDeletionSource source =
+ new TestablePipeHistoricalDataRegionTsFileAndDeletionSource();
+ final File tempDir =
Files.createTempDirectory("pipeHistoricalReplicateIndex").toFile();
+
+ try {
+ final TsFileResource resource = createTsFileResource(tempDir,
"stable.tsfile");
+ final String pipeName = "consensus_pipe_retry_test_" + System.nanoTime();
+ setPrivateField(source, "pipeName", pipeName);
+ ReplicateProgressDataNodeManager.resetReplicateIndexForIoTV2(pipeName);
+
+ Assert.assertEquals(1L,
source.assignReplicateIndexForResource(resource));
+ Assert.assertEquals(1L,
source.assignReplicateIndexForResource(resource));
+
+ source.clearReplicateIndexForResource(resource);
+ Assert.assertEquals(2L,
source.assignReplicateIndexForResource(resource));
+ } finally {
+ FileUtils.deleteFileOrDirectory(tempDir);
+ }
+ }
+
private static TsFileResource createTsFileResource(final File tempDir, final
String fileName)
throws IOException {
final File file = new File(tempDir, fileName);
@@ -169,6 +229,8 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
private final List<String> consumedSkippedTsFilePaths = new ArrayList<>();
private final List<String> suppliedTsFiles = new ArrayList<>();
private Event suppliedEvent;
+ private RuntimeException exceptionToThrow;
+ private int remainingFailureCount;
private void setSkippedTsFilePaths(final String... skippedTsFilePaths) {
this.skippedTsFilePaths.clear();
@@ -199,6 +261,12 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
this.suppliedEvent = suppliedEvent;
}
+ private void setFailureBeforeSuccess(
+ final RuntimeException exceptionToThrow, final int
remainingFailureCount) {
+ this.exceptionToThrow = exceptionToThrow;
+ this.remainingFailureCount = remainingFailureCount;
+ }
+
@Override
protected boolean consumeSkippedHistoricalTsFileEventIfNecessary(
final TsFileResource resource) {
@@ -212,6 +280,10 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
@Override
protected Event supplyTsFileEvent(final TsFileResource resource) {
suppliedTsFiles.add(resource.getTsFilePath());
+ if (remainingFailureCount > 0) {
+ remainingFailureCount--;
+ throw exceptionToThrow;
+ }
return suppliedEvent;
}
}