This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch load_v2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bda76dffe0fb87ec080a2c4884f94d9610a0293c Author: Tian Jiang <[email protected]> AuthorDate: Mon Oct 30 09:31:15 2023 +0800 integrate historical extractor --- .../common/tsfile/PipeBatchTsFileInsertionEvent.java | 20 ++++++++++++++++---- .../db/pipe/extractor/IoTDBDataRegionExtractor.java | 18 +++++++++++++++--- .../extractor/historical/BatchedTsFileExtractor.java | 1 + 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java index 222532c5673..72d8b80a292 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java @@ -53,18 +53,21 @@ public class PipeBatchTsFileInsertionEvent extends EnrichedEvent private final List<TsFileResource> resources; private List<File> tsFiles; + private final boolean isLoaded; private final boolean isGeneratedByPipe; private final AtomicBoolean[] isClosed; private TsFileListInsertionDataContainer dataContainer; - public PipeBatchTsFileInsertionEvent(List<TsFileResource> resources, boolean isGeneratedByPipe) { - this(resources, isGeneratedByPipe, null, null, Long.MIN_VALUE, Long.MAX_VALUE, false); + public PipeBatchTsFileInsertionEvent( + List<TsFileResource> resources, boolean isLoaded, boolean isGeneratedByPipe) { + this(resources, isLoaded, isGeneratedByPipe, null, null, Long.MIN_VALUE, Long.MAX_VALUE, false); } public PipeBatchTsFileInsertionEvent( List<TsFileResource> resources, + boolean isLoaded, boolean isGeneratedByPipe, PipeTaskMeta pipeTaskMeta, String pattern, @@ -78,13 +81,14 @@ public class PipeBatchTsFileInsertionEvent extends EnrichedEvent this.needParseTime = needParseTime; if (needParseTime) { - this.isPatternAndTimeParsed = false; + this.isTimeParsed = false; } this.resources = resources; tsFiles = resources.stream().map(TsFileResource::getTsFile).collect(Collectors.toList()); isClosed = new AtomicBoolean[resources.size()]; + this.isLoaded = isLoaded; this.isGeneratedByPipe = isGeneratedByPipe; for (int i = 0; i < isClosed.length; i++) { @@ -181,7 +185,14 @@ public class PipeBatchTsFileInsertionEvent extends EnrichedEvent public PipeBatchTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( PipeTaskMeta pipeTaskMeta, String pattern) { return new PipeBatchTsFileInsertionEvent( - resources, isGeneratedByPipe, pipeTaskMeta, pattern, startTime, endTime, needParseTime); + resources, + isLoaded, + isGeneratedByPipe, + pipeTaskMeta, + pattern, + startTime, + endTime, + needParseTime); } @Override @@ -247,6 +258,7 @@ public class PipeBatchTsFileInsertionEvent extends EnrichedEvent result.add( new PipeTsFileInsertionEvent( resource, + isLoaded, isGeneratedByPipe, pipeTaskMeta, getPattern(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java index c68e3047404..927858421d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.extractor; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; +import org.apache.iotdb.db.pipe.extractor.historical.BatchedTsFileExtractor; import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionTsFileExtractor; import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor; @@ -48,6 +49,9 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOCAL_SPLIT_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE; @@ -120,16 +124,24 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE); } - constructHistoricalExtractor(); + constructHistoricalExtractor(validator.getParameters()); constructRealtimeExtractor(validator.getParameters()); historicalExtractor.validate(validator); realtimeExtractor.validate(validator); } - private void constructHistoricalExtractor() { + private void constructHistoricalExtractor(PipeParameters parameters) { // Enable historical extractor by default - historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor(); + if (parameters.getBooleanOrDefault(CONNECTOR_LOCAL_SPLIT_ENABLE_KEY, false)) { + historicalExtractor = + new BatchedTsFileExtractor( + parameters.getIntOrDefault( + CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_KEY, + CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE)); + } else { + historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor(); + } } private void constructRealtimeExtractor(PipeParameters parameters) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java index 441b74705ab..3a5deeda141 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java @@ -62,6 +62,7 @@ public class BatchedTsFileExtractor extends PipeHistoricalDataRegionTsFileExtrac new PipeBatchTsFileInsertionEvent( tsFileResourceList, false, + false, pipeTaskMeta, pattern, historicalDataExtractionStartTime,
