This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch cascade-tsfile-trans in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 91eec524ffea0058fb26e93178e2fcc5bfc8ae71 Author: Steve Yurong Su <[email protected]> AuthorDate: Sun Aug 6 18:30:51 2023 +0800 cascade --- .../planner/plan/node/load/LoadSingleTsFileNode.java | 11 ++++++++--- .../iotdb/db/storageengine/dataregion/DataRegion.java | 5 +++++ .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 2 ++ .../dataregion/TsFileResourceProgressIndexTest.java | 18 ++++++++++++++++++ 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java index d536a357412..4f3506bcb75 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -99,9 +100,13 @@ public class LoadSingleTsFileNode extends WritePlanNode { needDecodeTsFile = !isDispatchedToLocal(new HashSet<>(partitionFetcher.apply(slotList))); } - if (!needDecodeTsFile && !resource.resourceFileExists()) { - resource.serialize(); - } + PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(resource); + + // we serialize the resource file even if the tsfile does not need to be decoded + // or the resource file is already existed because we need to serialize the + // progress index of the tsfile + resource.serialize(); + return needDecodeTsFile; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index db5d0168d20..0011db21534 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -47,6 +47,7 @@ import org.apache.iotdb.db.exception.WriteProcessRejectException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.quota.ExceedQuotaException; +import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache; @@ -2221,6 +2222,9 @@ public class DataRegion implements IDataRegionForQuery { } loadTsFileToUnSequence( tsfileToBeInserted, newTsFileResource, newFilePartitionId, deleteOriginFile); + + PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId, newTsFileResource); + FileMetrics.getInstance() .addFile( newTsFileResource.getTsFile().length(), @@ -2429,6 +2433,7 @@ public class DataRegion implements IDataRegionForQuery { } else { Files.copy(resourceFileToLoad.toPath(), targetResourceFile.toPath()); } + } catch (IOException e) { logger.error( "File renaming failed when loading .resource file. Origin: {}, Target: {}", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index 28bb8f82930..2f243b2b508 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache; @@ -106,6 +107,7 @@ public class FileLoaderUtils { } } resource.setStatus(TsFileResourceStatus.NORMAL); + PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(resource); return resource; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java index c5339c1b738..055d928bcf4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java @@ -21,6 +21,9 @@ package org.apache.iotdb.db.storageengine.dataregion; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; +import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; @@ -87,6 +90,21 @@ public class TsFileResourceProgressIndexTest { @Test public void testProgressIndexRecorder() { + HybridProgressIndex hybridProgressIndex = new HybridProgressIndex(); + hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new SimpleProgressIndex(3, 4)); + hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new SimpleProgressIndex(6, 6)); + hybridProgressIndex.updateToMinimumIsAfterProgressIndex( + new RecoverProgressIndex(1, new SimpleProgressIndex(1, 2))); + hybridProgressIndex.updateToMinimumIsAfterProgressIndex( + new RecoverProgressIndex(1, new SimpleProgressIndex(1, 3))); + hybridProgressIndex.updateToMinimumIsAfterProgressIndex( + new RecoverProgressIndex(2, new SimpleProgressIndex(4, 3))); + hybridProgressIndex.updateToMinimumIsAfterProgressIndex( + new RecoverProgressIndex(3, new SimpleProgressIndex(5, 3))); + Assert.assertTrue(hybridProgressIndex.isAfter(new SimpleProgressIndex(6, 5))); + Assert.assertTrue( + hybridProgressIndex.isAfter(new RecoverProgressIndex(3, new SimpleProgressIndex(5, 4)))); + Assert.assertTrue( new MockProgressIndex(0).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
