This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bda76dffe0fb87ec080a2c4884f94d9610a0293c
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Oct 30 09:31:15 2023 +0800

    integrate historical extractor
---
 .../common/tsfile/PipeBatchTsFileInsertionEvent.java | 20 ++++++++++++++++----
 .../db/pipe/extractor/IoTDBDataRegionExtractor.java  | 18 +++++++++++++++---
 .../extractor/historical/BatchedTsFileExtractor.java |  1 +
 3 files changed, 32 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java
index 222532c5673..72d8b80a292 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java
@@ -53,18 +53,21 @@ public class PipeBatchTsFileInsertionEvent extends 
EnrichedEvent
   private final List<TsFileResource> resources;
   private List<File> tsFiles;
 
+  private final boolean isLoaded;
   private final boolean isGeneratedByPipe;
 
   private final AtomicBoolean[] isClosed;
 
   private TsFileListInsertionDataContainer dataContainer;
 
-  public PipeBatchTsFileInsertionEvent(List<TsFileResource> resources, boolean 
isGeneratedByPipe) {
-    this(resources, isGeneratedByPipe, null, null, Long.MIN_VALUE, 
Long.MAX_VALUE, false);
+  public PipeBatchTsFileInsertionEvent(
+      List<TsFileResource> resources, boolean isLoaded, boolean 
isGeneratedByPipe) {
+    this(resources, isLoaded, isGeneratedByPipe, null, null, Long.MIN_VALUE, 
Long.MAX_VALUE, false);
   }
 
   public PipeBatchTsFileInsertionEvent(
       List<TsFileResource> resources,
+      boolean isLoaded,
       boolean isGeneratedByPipe,
       PipeTaskMeta pipeTaskMeta,
       String pattern,
@@ -78,13 +81,14 @@ public class PipeBatchTsFileInsertionEvent extends 
EnrichedEvent
     this.needParseTime = needParseTime;
 
     if (needParseTime) {
-      this.isPatternAndTimeParsed = false;
+      this.isTimeParsed = false;
     }
 
     this.resources = resources;
     tsFiles = 
resources.stream().map(TsFileResource::getTsFile).collect(Collectors.toList());
     isClosed = new AtomicBoolean[resources.size()];
 
+    this.isLoaded = isLoaded;
     this.isGeneratedByPipe = isGeneratedByPipe;
 
     for (int i = 0; i < isClosed.length; i++) {
@@ -181,7 +185,14 @@ public class PipeBatchTsFileInsertionEvent extends 
EnrichedEvent
   public PipeBatchTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       PipeTaskMeta pipeTaskMeta, String pattern) {
     return new PipeBatchTsFileInsertionEvent(
-        resources, isGeneratedByPipe, pipeTaskMeta, pattern, startTime, 
endTime, needParseTime);
+        resources,
+        isLoaded,
+        isGeneratedByPipe,
+        pipeTaskMeta,
+        pattern,
+        startTime,
+        endTime,
+        needParseTime);
   }
 
   @Override
@@ -247,6 +258,7 @@ public class PipeBatchTsFileInsertionEvent extends 
EnrichedEvent
       result.add(
           new PipeTsFileInsertionEvent(
               resource,
+              isLoaded,
               isGeneratedByPipe,
               pipeTaskMeta,
               getPattern(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index c68e3047404..927858421d5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.extractor;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.db.pipe.extractor.historical.BatchedTsFileExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionTsFileExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
@@ -48,6 +49,9 @@ import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOCAL_SPLIT_ENABLE_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
@@ -120,16 +124,24 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
           EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE);
     }
 
-    constructHistoricalExtractor();
+    constructHistoricalExtractor(validator.getParameters());
     constructRealtimeExtractor(validator.getParameters());
 
     historicalExtractor.validate(validator);
     realtimeExtractor.validate(validator);
   }
 
-  private void constructHistoricalExtractor() {
+  private void constructHistoricalExtractor(PipeParameters parameters) {
     // Enable historical extractor by default
-    historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
+    if (parameters.getBooleanOrDefault(CONNECTOR_LOCAL_SPLIT_ENABLE_KEY, 
false)) {
+      historicalExtractor =
+          new BatchedTsFileExtractor(
+              parameters.getIntOrDefault(
+                  CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_KEY,
+                  CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE));
+    } else {
+      historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
+    }
   }
 
   private void constructRealtimeExtractor(PipeParameters parameters) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java
index 441b74705ab..3a5deeda141 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java
@@ -62,6 +62,7 @@ public class BatchedTsFileExtractor extends 
PipeHistoricalDataRegionTsFileExtrac
         new PipeBatchTsFileInsertionEvent(
             tsFileResourceList,
             false,
+            false,
             pipeTaskMeta,
             pattern,
             historicalDataExtractionStartTime,

Reply via email to