This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new a49a826beed IoTConsensusV2: persistent isGeneratedByPipeConsensus
(#14938) (#15042)
a49a826beed is described below
commit a49a826beed78dffe16b62379d4ec778d72a6fa8
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Mar 10 10:53:49 2025 +0800
IoTConsensusV2: persistent isGeneratedByPipeConsensus (#14938) (#15042)
(cherry picked from commit 93f0795233da49d1be8be77641071c8357e46ff1)
Co-authored-by: Peng Junzhi <[email protected]>
---
.../common/tsfile/PipeTsFileInsertionEvent.java | 6 +-
.../event/realtime/PipeRealtimeEventFactory.java | 4 +-
.../PipeHistoricalDataRegionTsFileExtractor.java | 1 -
.../listener/PipeInsertionDataNodeListener.java | 11 +--
.../plan/analyze/LoadTsFileAnalyzer.java | 4 +
.../plan/scheduler/load/LoadTsFileScheduler.java | 87 +++++++++++-----------
.../execute/task/CrossSpaceCompactionTask.java | 2 +-
.../execute/task/InnerSpaceCompactionTask.java | 2 +-
.../task/RepairUnsortedFileCompactionTask.java | 2 +-
.../compaction/execute/utils/CompactionUtils.java | 12 ++-
.../dataregion/tsfile/TsFileResource.java | 19 +++++
.../event/TsFileInsertionDataContainerTest.java | 13 ++++
12 files changed, 104 insertions(+), 59 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 7da01dc9e3a..15378fc6482 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -86,14 +86,12 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
public PipeTsFileInsertionEvent(
final TsFileResource resource,
final boolean isLoaded,
- final boolean isGeneratedByPipe,
final boolean isGeneratedByHistoricalExtractor) {
// The modFile must be copied before the event is assigned to the
listening pipes
this(
resource,
true,
isLoaded,
- isGeneratedByPipe,
isGeneratedByHistoricalExtractor,
null,
0,
@@ -107,7 +105,6 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
final TsFileResource resource,
final boolean isWithMod,
final boolean isLoaded,
- final boolean isGeneratedByPipe,
final boolean isGeneratedByHistoricalExtractor,
final String pipeName,
final long creationTime,
@@ -125,7 +122,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
this.modFile = this.isWithMod ? new File(modFile.getFilePath()) : null;
this.isLoaded = isLoaded;
- this.isGeneratedByPipe = isGeneratedByPipe;
+ this.isGeneratedByPipe = resource.isGeneratedByPipe();
this.isGeneratedByPipeConsensus = resource.isGeneratedByPipeConsensus();
this.isGeneratedByHistoricalExtractor = isGeneratedByHistoricalExtractor;
@@ -349,7 +346,6 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
resource,
isWithMod,
isLoaded,
- isGeneratedByPipe,
isGeneratedByHistoricalExtractor,
pipeName,
creationTime,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 1dd86e1e5d8..ef9db9a4cb0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -35,9 +35,9 @@ public class PipeRealtimeEventFactory {
private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new
TsFileEpochManager();
public static PipeRealtimeEvent createRealtimeEvent(
- final TsFileResource resource, final boolean isLoaded, final boolean
isGeneratedByPipe) {
+ final TsFileResource resource, final boolean isLoaded) {
return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
- new PipeTsFileInsertionEvent(resource, isLoaded, isGeneratedByPipe,
false), resource);
+ new PipeTsFileInsertionEvent(resource, isLoaded, false), resource);
}
public static PipeRealtimeEvent createRealtimeEvent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 80d43a320b3..dba9561105a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -628,7 +628,6 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
resource,
shouldTransferModFile,
false,
- false,
true,
pipeName,
creationTime,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 3eb118701a7..452c7188dec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -95,10 +95,11 @@ public class PipeInsertionDataNodeListener {
//////////////////////////// listen to events ////////////////////////////
public void listenToTsFile(
- String dataRegionId,
- TsFileResource tsFileResource,
- boolean isLoaded,
- boolean isGeneratedByPipe) {
+ final String dataRegionId,
+ final TsFileResource tsFileResource,
+ final boolean isLoaded,
+ final boolean isGeneratedByPipe) {
+ tsFileResource.setGeneratedByPipe(isGeneratedByPipe);
// We don't judge whether listenToTsFileExtractorCount.get() == 0 here on
purpose
// because extractors may use tsfile events when some exceptions occur in
the
// insert nodes listening process.
@@ -111,7 +112,7 @@ public class PipeInsertionDataNodeListener {
}
assigner.publishToAssign(
- PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource, isLoaded,
isGeneratedByPipe));
+ PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource,
isLoaded));
}
public void listenToInsertNode(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 30cd8c23389..50b63288ee8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -273,6 +273,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
tsFileResource.updatePlanIndexes(reader.getMaxPlanIndex());
} else {
tsFileResource.deserialize();
+ // Reset tsfileResource's isGeneratedByPipe mark to prevent
deserializing the wrong mark.
+ // If this tsfile is loaded by a pipe receiver, the correct mark will
be added in
+ // `listenToTsFile`
+
tsFileResource.setGeneratedByPipe(loadTsFileStatement.isGeneratedByPipe());
}
schemaAutoCreatorAndVerifier.setCurrentModificationsAndTimeIndex(tsFileResource);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 08cf36e6ada..e7d6a719ef0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -57,7 +57,6 @@ import
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.StorageEngine;
-import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
@@ -93,6 +92,7 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
@@ -493,48 +493,51 @@ public class LoadTsFileScheduler implements IScheduler {
}
// add metrics
- DataRegion dataRegion =
- StorageEngine.getInstance()
- .getDataRegion(
- (DataRegionId)
- ConsensusGroupId.Factory.createFromTConsensusGroupId(
- node.getLocalRegionReplicaSet().getRegionId()));
-
- dataRegion
- .getNonSystemDatabaseName()
+ Optional.ofNullable(
+ StorageEngine.getInstance()
+ .getDataRegion(
+ (DataRegionId)
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(
+ node.getLocalRegionReplicaSet().getRegionId())))
.ifPresent(
- databaseName -> {
- // Report load tsFile points to IoTDB flush metrics
- MemTableFlushTask.recordFlushPointsMetricInternal(
- node.getWritePointCount(), databaseName,
dataRegion.getDataRegionId());
-
- MetricService.getInstance()
- .count(
- node.getWritePointCount(),
- Metric.QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- Metric.POINTS_IN.toString(),
- Tag.DATABASE.toString(),
- databaseName,
- Tag.REGION.toString(),
- dataRegion.getDataRegionId(),
- Tag.TYPE.toString(),
- Metric.LOAD_POINT_COUNT.toString());
- MetricService.getInstance()
- .count(
- node.getWritePointCount(),
- Metric.LEADER_QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- Metric.POINTS_IN.toString(),
- Tag.DATABASE.toString(),
- databaseName,
- Tag.REGION.toString(),
- dataRegion.getDataRegionId(),
- Tag.TYPE.toString(),
- Metric.LOAD_POINT_COUNT.toString());
- });
+ dataRegion ->
+ dataRegion
+ .getNonSystemDatabaseName()
+ .ifPresent(
+ databaseName -> {
+ // Report load tsFile points to IoTDB flush metrics
+ MemTableFlushTask.recordFlushPointsMetricInternal(
+ node.getWritePointCount(),
+ databaseName,
+ dataRegion.getDataRegionId());
+
+ MetricService.getInstance()
+ .count(
+ node.getWritePointCount(),
+ Metric.QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ Metric.POINTS_IN.toString(),
+ Tag.DATABASE.toString(),
+ databaseName,
+ Tag.REGION.toString(),
+ dataRegion.getDataRegionId(),
+ Tag.TYPE.toString(),
+ Metric.LOAD_POINT_COUNT.toString());
+ MetricService.getInstance()
+ .count(
+ node.getWritePointCount(),
+ Metric.LEADER_QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ Metric.POINTS_IN.toString(),
+ Tag.DATABASE.toString(),
+ databaseName,
+ Tag.REGION.toString(),
+ dataRegion.getDataRegionId(),
+ Tag.TYPE.toString(),
+ Metric.LOAD_POINT_COUNT.toString());
+ }));
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index 3427361d192..3ed2823e8b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -191,7 +191,7 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
performer.setSummary(summary);
performer.perform();
- CompactionUtils.updateProgressIndex(
+ CompactionUtils.updateProgressIndexAndMark(
targetTsfileResourceList, selectedSequenceFiles,
selectedUnsequenceFiles);
CompactionUtils.moveTargetFile(
targetTsfileResourceList,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index 0fb4b1d851a..25674209023 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -438,7 +438,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
}
protected void prepareTargetFiles() throws IOException {
- CompactionUtils.updateProgressIndex(
+ CompactionUtils.updateProgressIndexAndMark(
filesView.targetFilesInPerformer,
filesView.sequence ? filesView.sourceFilesInCompactionPerformer :
Collections.emptyList(),
filesView.sequence ? Collections.emptyList() :
filesView.sourceFilesInCompactionPerformer);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
index bccd7cea672..f43bd7767b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
@@ -150,7 +150,7 @@ public class RepairUnsortedFileCompactionTask extends
InnerSpaceCompactionTask {
@Override
protected void prepareTargetFiles() throws IOException {
- CompactionUtils.updateProgressIndex(
+ CompactionUtils.updateProgressIndexAndMark(
filesView.targetFilesInPerformer,
filesView.sourceFilesInCompactionPerformer,
Collections.emptyList());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index 5aa9143259b..afbea98ec29 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -282,16 +282,26 @@ public class CompactionUtils {
}
}
- public static void updateProgressIndex(
+ public static void updateProgressIndexAndMark(
List<TsFileResource> targetResources,
List<TsFileResource> seqResources,
List<TsFileResource> unseqResources) {
for (TsFileResource targetResource : targetResources) {
for (TsFileResource unseqResource : unseqResources) {
targetResource.updateProgressIndex(unseqResource.getMaxProgressIndexAfterClose());
+ targetResource.setGeneratedByPipe(
+ unseqResource.isGeneratedByPipe() &&
targetResource.isGeneratedByPipe());
+ targetResource.setGeneratedByPipeConsensus(
+ unseqResource.isGeneratedByPipeConsensus()
+ && targetResource.isGeneratedByPipeConsensus());
}
for (TsFileResource seqResource : seqResources) {
targetResource.updateProgressIndex(seqResource.getMaxProgressIndexAfterClose());
+ targetResource.setGeneratedByPipe(
+ seqResource.isGeneratedByPipe() &&
targetResource.isGeneratedByPipe());
+ targetResource.setGeneratedByPipeConsensus(
+ seqResource.isGeneratedByPipeConsensus()
+ && targetResource.isGeneratedByPipeConsensus());
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 5266667fb55..725408b7801 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -170,6 +170,9 @@ public class TsFileResource {
/** used to prevent circular replication in PipeConsensus */
private boolean isGeneratedByPipeConsensus = false;
+ /** used to prevent circular replication in Pipe */
+ private boolean isGeneratedByPipe = false;
+
private InsertionCompactionCandidateStatus
insertionCompactionCandidateStatus =
InsertionCompactionCandidateStatus.NOT_CHECKED;
@@ -268,6 +271,9 @@ public class TsFileResource {
} else {
TsFileResourceBlockType.EMPTY_BLOCK.serialize(outputStream);
}
+
+ ReadWriteIOUtils.write(isGeneratedByPipeConsensus, outputStream);
+ ReadWriteIOUtils.write(isGeneratedByPipe, outputStream);
}
/** deserialize from disk */
@@ -294,6 +300,11 @@ public class TsFileResource {
maxProgressIndex = ProgressIndexType.deserializeFrom(inputStream);
}
}
+
+ if (inputStream.available() > 0) {
+ isGeneratedByPipeConsensus = ReadWriteIOUtils.readBoolean(inputStream);
+ isGeneratedByPipe = ReadWriteIOUtils.readBoolean(inputStream);
+ }
}
}
@@ -556,6 +567,14 @@ public class TsFileResource {
isGeneratedByPipeConsensus = generatedByPipeConsensus;
}
+ public boolean isGeneratedByPipe() {
+ return isGeneratedByPipe;
+ }
+
+ public void setGeneratedByPipe(boolean generatedByPipe) {
+ isGeneratedByPipe = generatedByPipe;
+ }
+
public void writeLock() {
if (originTsFileResource == null) {
tsFileLock.writeLock();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 50816ebf968..d97f2da68e6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
@@ -557,6 +558,18 @@ public class TsFileInsertionDataContainerTest {
final long endTime,
final boolean isQuery,
final int expectedCount) {
+ PipeTsFileInsertionEvent tsFileInsertionEvent =
+ new PipeTsFileInsertionEvent(
+ new TsFileResource(tsFile),
+ true,
+ false,
+ false,
+ null,
+ 0,
+ null,
+ null,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE);
try (final TsFileInsertionDataContainer tsFileContainer =
isQuery
? new TsFileInsertionQueryDataContainer(tsFile, pattern,
startTime, endTime)