This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch rc/1.2.1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a080020d7abbbf2e888806d7cf39c510bab76641 Author: HTHou <[email protected]> AuthorDate: Mon Aug 21 16:21:02 2023 +0800 Revert "[IOTDB-6101] Pipe: Support tsfile cascade transport (#10795) (#10796)" This reverts commit d8050eef07a4d947a8a52b195326b6ed4b78ce2e. --- .../listener/PipeInsertionDataNodeListener.java | 9 ++++++--- .../planner/plan/node/load/LoadSingleTsFileNode.java | 11 +++-------- .../iotdb/db/storageengine/dataregion/DataRegion.java | 5 ----- .../dataregion/memtable/TsFileProcessor.java | 2 -- .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 2 -- .../dataregion/TsFileResourceProgressIndexTest.java | 18 ------------------ 6 files changed, 9 insertions(+), 38 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java index b7a7a0c3770..8afa1831566 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.extractor.realtime.listener; +import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory; import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.realtime.assigner.PipeDataRegionAssigner; @@ -92,9 +93,11 @@ public class PipeInsertionDataNodeListener { //////////////////////////// listen to events //////////////////////////// public void listenToTsFile(String dataRegionId, TsFileResource tsFileResource) { - // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on purpose - // because extractors may use tsfile events when some exceptions occur in the - // insert nodes listening process. + // wo don't judge whether listenToTsFileExtractorCount.get() == 0 here, because + // when using SimpleProgressIndex, the tsfile event needs to be assigned to the + // extractor even if listenToTsFileExtractorCount.get() == 0 to record the progress + + PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource); final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java index 4f3506bcb75..d536a357412 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java @@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -100,13 +99,9 @@ public class LoadSingleTsFileNode extends WritePlanNode { needDecodeTsFile = !isDispatchedToLocal(new HashSet<>(partitionFetcher.apply(slotList))); } - PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(resource); - - // we serialize the resource file even if the tsfile does not need to be decoded - // or the resource file is already existed because we need to serialize the - // progress index of the tsfile - resource.serialize(); - + if (!needDecodeTsFile && !resource.resourceFileExists()) { + resource.serialize(); + } return needDecodeTsFile; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 0011db21534..db5d0168d20 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -47,7 +47,6 @@ import org.apache.iotdb.db.exception.WriteProcessRejectException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.quota.ExceedQuotaException; -import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache; @@ -2222,9 +2221,6 @@ public class DataRegion implements IDataRegionForQuery { } loadTsFileToUnSequence( tsfileToBeInserted, newTsFileResource, newFilePartitionId, deleteOriginFile); - - PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId, newTsFileResource); - FileMetrics.getInstance() .addFile( newTsFileResource.getTsFile().length(), @@ -2433,7 +2429,6 @@ public class DataRegion implements IDataRegionForQuery { } else { Files.copy(resourceFileToLoad.toPath(), targetResourceFile.toPath()); } - } catch (IOException e) { logger.error( "File renaming failed when loading .resource file. Origin: {}, Target: {}", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index dc7086dde70..1ded166cfc7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -31,7 +31,6 @@ import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.WriteProcessRejectException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; @@ -891,7 +890,6 @@ public class TsFileProcessor { IMemTable tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable; try { - PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource); PipeInsertionDataNodeListener.getInstance() .listenToTsFile(dataRegionInfo.getDataRegion().getDataRegionId(), tsFileResource); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index 2f243b2b508..28bb8f82930 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.utils; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache; @@ -107,7 +106,6 @@ public class FileLoaderUtils { } } resource.setStatus(TsFileResourceStatus.NORMAL); - PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(resource); return resource; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java index 62b9caa2f35..c5339c1b738 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java @@ -21,9 +21,6 @@ package org.apache.iotdb.db.storageengine.dataregion; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; -import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex; -import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex; -import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; @@ -90,21 +87,6 @@ public class TsFileResourceProgressIndexTest { @Test public void testProgressIndexRecorder() { - HybridProgressIndex hybridProgressIndex = new HybridProgressIndex(); - hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new SimpleProgressIndex(3, 4)); - hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new SimpleProgressIndex(6, 6)); - hybridProgressIndex.updateToMinimumIsAfterProgressIndex( - new RecoverProgressIndex(1, new SimpleProgressIndex(1, 2))); - hybridProgressIndex.updateToMinimumIsAfterProgressIndex( - new RecoverProgressIndex(1, new SimpleProgressIndex(1, 3))); - hybridProgressIndex.updateToMinimumIsAfterProgressIndex( - new RecoverProgressIndex(2, new SimpleProgressIndex(4, 3))); - hybridProgressIndex.updateToMinimumIsAfterProgressIndex( - new RecoverProgressIndex(3, new SimpleProgressIndex(5, 5))); - Assert.assertTrue(hybridProgressIndex.isAfter(new SimpleProgressIndex(6, 5))); - Assert.assertTrue( - hybridProgressIndex.isAfter(new RecoverProgressIndex(3, new SimpleProgressIndex(5, 4)))); - Assert.assertTrue( new MockProgressIndex(0).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
