This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b3a4bdf813f [IOTDB-6101] Pipe: Support tsfile cascade transport
(#10795)
b3a4bdf813f is described below
commit b3a4bdf813f7f307a4a8b06aab73432da773dcda
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun Aug 6 21:02:30 2023 +0800
[IOTDB-6101] Pipe: Support tsfile cascade transport (#10795)
Support tsfile cascade transport. For example, there are 3 iotdb clusters
A, B and C. Now we can use pipe to transport tsfile from A to C (via B, A -> B
-> C).
---
.../listener/PipeInsertionDataNodeListener.java | 9 +++------
.../planner/plan/node/load/LoadSingleTsFileNode.java | 11 ++++++++---
.../iotdb/db/storageengine/dataregion/DataRegion.java | 5 +++++
.../dataregion/memtable/TsFileProcessor.java | 2 ++
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 2 ++
.../dataregion/TsFileResourceProgressIndexTest.java | 18 ++++++++++++++++++
6 files changed, 38 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index 8afa1831566..b7a7a0c3770 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.extractor.realtime.listener;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
import
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.realtime.assigner.PipeDataRegionAssigner;
@@ -93,11 +92,9 @@ public class PipeInsertionDataNodeListener {
//////////////////////////// listen to events ////////////////////////////
public void listenToTsFile(String dataRegionId, TsFileResource
tsFileResource) {
- // wo don't judge whether listenToTsFileExtractorCount.get() == 0 here,
because
- // when using SimpleProgressIndex, the tsfile event needs to be assigned
to the
- // extractor even if listenToTsFileExtractorCount.get() == 0 to record the
progress
-
- PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
+ // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on
purpose
+ // because extractors may use tsfile events when some exceptions occur in
the
+ // insert nodes listening process.
final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index d536a357412..4f3506bcb75 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -99,9 +100,13 @@ public class LoadSingleTsFileNode extends WritePlanNode {
needDecodeTsFile = !isDispatchedToLocal(new
HashSet<>(partitionFetcher.apply(slotList)));
}
- if (!needDecodeTsFile && !resource.resourceFileExists()) {
- resource.serialize();
- }
+ PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(resource);
+
+ // we serialize the resource file even if the tsfile does not need to be
decoded
+ // or the resource file is already existed because we need to serialize the
+ // progress index of the tsfile
+ resource.serialize();
+
return needDecodeTsFile;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index db5d0168d20..0011db21534 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -47,6 +47,7 @@ import
org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
+import
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
@@ -2221,6 +2222,9 @@ public class DataRegion implements IDataRegionForQuery {
}
loadTsFileToUnSequence(
tsfileToBeInserted, newTsFileResource, newFilePartitionId,
deleteOriginFile);
+
+ PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId,
newTsFileResource);
+
FileMetrics.getInstance()
.addFile(
newTsFileResource.getTsFile().length(),
@@ -2429,6 +2433,7 @@ public class DataRegion implements IDataRegionForQuery {
} else {
Files.copy(resourceFileToLoad.toPath(), targetResourceFile.toPath());
}
+
} catch (IOException e) {
logger.error(
"File renaming failed when loading .resource file. Origin: {},
Target: {}",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index c82cc639434..52352408b19 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
@@ -862,6 +863,7 @@ public class TsFileProcessor {
IMemTable tmpMemTable = workMemTable == null ? new NotifyFlushMemTable()
: workMemTable;
try {
+ PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
PipeInsertionDataNodeListener.getInstance()
.listenToTsFile(dataRegionInfo.getDataRegion().getDataRegionId(),
tsFileResource);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 28bb8f82930..2f243b2b508 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
@@ -106,6 +107,7 @@ public class FileLoaderUtils {
}
}
resource.setStatus(TsFileResourceStatus.NORMAL);
+ PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(resource);
return resource;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
index c5339c1b738..62b9caa2f35 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.storageengine.dataregion;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
@@ -87,6 +90,21 @@ public class TsFileResourceProgressIndexTest {
@Test
public void testProgressIndexRecorder() {
+ HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
+ hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new
SimpleProgressIndex(3, 4));
+ hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new
SimpleProgressIndex(6, 6));
+ hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
+ new RecoverProgressIndex(1, new SimpleProgressIndex(1, 2)));
+ hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
+ new RecoverProgressIndex(1, new SimpleProgressIndex(1, 3)));
+ hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
+ new RecoverProgressIndex(2, new SimpleProgressIndex(4, 3)));
+ hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
+ new RecoverProgressIndex(3, new SimpleProgressIndex(5, 5)));
+ Assert.assertTrue(hybridProgressIndex.isAfter(new SimpleProgressIndex(6,
5)));
+ Assert.assertTrue(
+ hybridProgressIndex.isAfter(new RecoverProgressIndex(3, new
SimpleProgressIndex(5, 4))));
+
Assert.assertTrue(
new
MockProgressIndex(0).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));