This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-table-model-3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-table-model-3 by this 
push:
     new 9048640b55e IoTDBDataRegionTableModelExtractor impl the same with 
IoTDBDataRegionExtractor
9048640b55e is described below

commit 9048640b55e974aa13e8fc66fe0d11dbca316b94
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Sep 26 18:10:38 2024 +0800

    IoTDBDataRegionTableModelExtractor impl the same with 
IoTDBDataRegionExtractor
---
 .../dataregion/IoTDBDataRegionExtractor.java       |  12 +-
 .../IoTDBDataRegionTableModelExtractor.java        | 547 ++++++++++++++++++++-
 2 files changed, 552 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index f05bcf2bea5..7a3397c9ef4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -93,14 +93,14 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataRegionExtractor.class);
 
-  private PipeHistoricalDataRegionExtractor historicalExtractor;
-  private PipeRealtimeDataRegionExtractor realtimeExtractor;
+  protected PipeHistoricalDataRegionExtractor historicalExtractor;
+  protected PipeRealtimeDataRegionExtractor realtimeExtractor;
 
-  private DataRegionWatermarkInjector watermarkInjector;
+  protected DataRegionWatermarkInjector watermarkInjector;
 
-  private boolean hasNoExtractionNeed = true;
-  private boolean shouldExtractInsertion = false;
-  private boolean shouldExtractDeletion = false;
+  protected boolean hasNoExtractionNeed = true;
+  protected boolean shouldExtractInsertion = false;
+  protected boolean shouldExtractDeletion = false;
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
index a8c35b78333..fa23d12a2e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
@@ -19,6 +19,551 @@
 
 package org.apache.iotdb.db.pipe.table.extractor.dataregion;
 
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
+import 
org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionWatermarkInjector;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionTsFileExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHeartbeatExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
+import 
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public class IoTDBDataRegionTableModelExtractor extends 
IoTDBDataRegionExtractor {}
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_EXCLUSION_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_INCLUSION_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_EXCLUSION_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_INCLUSION_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY;
+import static 
org.apache.iotdb.commons.pipe.datastructure.options.PipeInclusionOptions.hasAtLeastOneOption;
+import static 
org.apache.iotdb.commons.pipe.datastructure.options.PipeInclusionOptions.optionsAreAllLegal;
+
+public class IoTDBDataRegionTableModelExtractor extends 
IoTDBDataRegionExtractor {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(IoTDBDataRegionTableModelExtractor.class);
+
+  @Override
+  public void validate(final PipeParameterValidator validator) throws 
Exception {
+    validator
+        .validate(
+            args -> optionsAreAllLegal((String) args),
+            "The 'inclusion' string contains illegal path.",
+            validator
+                .getParameters()
+                .getStringOrDefault(
+                    Arrays.asList(EXTRACTOR_INCLUSION_KEY, 
SOURCE_INCLUSION_KEY),
+                    EXTRACTOR_INCLUSION_DEFAULT_VALUE))
+        .validate(
+            args -> optionsAreAllLegal((String) args),
+            "The 'inclusion.exclusion' string contains illegal path.",
+            validator
+                .getParameters()
+                .getStringOrDefault(
+                    Arrays.asList(EXTRACTOR_EXCLUSION_KEY, 
SOURCE_EXCLUSION_KEY),
+                    EXTRACTOR_EXCLUSION_DEFAULT_VALUE))
+        .validate(
+            args -> hasAtLeastOneOption((String) args[0], (String) args[1]),
+            "The pipe inclusion content can't be empty.",
+            validator
+                .getParameters()
+                .getStringOrDefault(
+                    Arrays.asList(EXTRACTOR_INCLUSION_KEY, 
SOURCE_INCLUSION_KEY),
+                    EXTRACTOR_INCLUSION_DEFAULT_VALUE),
+            validator
+                .getParameters()
+                .getStringOrDefault(
+                    Arrays.asList(EXTRACTOR_EXCLUSION_KEY, 
SOURCE_EXCLUSION_KEY),
+                    EXTRACTOR_EXCLUSION_DEFAULT_VALUE));
+
+    final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
+        DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
+            validator.getParameters());
+    if (insertionDeletionListeningOptionPair.getLeft().equals(false)
+        && insertionDeletionListeningOptionPair.getRight().equals(false)) {
+      return;
+    }
+    hasNoExtractionNeed = false;
+    shouldExtractInsertion = insertionDeletionListeningOptionPair.getLeft();
+    shouldExtractDeletion = insertionDeletionListeningOptionPair.getRight();
+
+    if (insertionDeletionListeningOptionPair.getLeft().equals(true)
+        && IoTDBDescriptor.getInstance()
+            .getConfig()
+            .getDataRegionConsensusProtocolClass()
+            .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+      throw new PipeException(
+          "The pipe cannot transfer data when data region is using ratis 
consensus.");
+    }
+
+    // Validate extractor.pattern.format is within valid range
+    validator
+        .validateAttributeValueRange(
+            EXTRACTOR_PATTERN_FORMAT_KEY,
+            true,
+            EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE,
+            EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE)
+        .validateAttributeValueRange(
+            SOURCE_PATTERN_FORMAT_KEY,
+            true,
+            EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE,
+            EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE);
+
+    // Get the pattern format to check whether the pattern is legal
+    final PipePattern pattern =
+        
PipePattern.parsePipePatternFromSourceParameters(validator.getParameters());
+
+    // Check whether the pattern is legal
+    validatePattern(pattern);
+
+    // Validate extractor.history.enable and extractor.realtime.enable
+    validator
+        .validateAttributeValueRange(
+            EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
+        .validateAttributeValueRange(
+            EXTRACTOR_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
+        .validateAttributeValueRange(
+            SOURCE_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
+        .validateAttributeValueRange(
+            SOURCE_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
+        .validate(
+            args -> (boolean) args[0] || (boolean) args[1],
+            "Should not set both history.enable and realtime.enable to false.",
+            validator
+                .getParameters()
+                .getBooleanOrDefault(
+                    Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
+                    EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE),
+            validator
+                .getParameters()
+                .getBooleanOrDefault(
+                    Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
+                    EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
+
+    // Validate extractor.realtime.mode
+    if (validator
+            .getParameters()
+            .getBooleanOrDefault(
+                Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
+                EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)
+        || validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY, 
SOURCE_END_TIME_KEY)) {
+      validator.validateAttributeValueRange(
+          validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY)
+              ? EXTRACTOR_REALTIME_MODE_KEY
+              : SOURCE_REALTIME_MODE_KEY,
+          true,
+          EXTRACTOR_REALTIME_MODE_FILE_VALUE,
+          EXTRACTOR_REALTIME_MODE_HYBRID_VALUE,
+          EXTRACTOR_REALTIME_MODE_LOG_VALUE,
+          EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE,
+          EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE,
+          EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE);
+    }
+
+    // Validate source.start-time and source.end-time
+    if (validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY, 
SOURCE_END_TIME_KEY)
+        && validator
+            .getParameters()
+            .hasAnyAttributes(
+                EXTRACTOR_HISTORY_ENABLE_KEY,
+                EXTRACTOR_REALTIME_ENABLE_KEY,
+                SOURCE_HISTORY_ENABLE_KEY,
+                SOURCE_REALTIME_ENABLE_KEY)) {
+      LOGGER.warn(
+          "When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is 
invalid.",
+          SOURCE_START_TIME_KEY,
+          EXTRACTOR_START_TIME_KEY,
+          SOURCE_END_TIME_KEY,
+          EXTRACTOR_END_TIME_KEY,
+          SOURCE_HISTORY_START_TIME_KEY,
+          EXTRACTOR_HISTORY_START_TIME_KEY,
+          SOURCE_HISTORY_END_TIME_KEY,
+          EXTRACTOR_HISTORY_END_TIME_KEY);
+    }
+
+    constructHistoricalExtractor();
+    constructRealtimeExtractor(validator.getParameters());
+
+    historicalExtractor.validate(validator);
+    realtimeExtractor.validate(validator);
+  }
+
+  private void validatePattern(final PipePattern pattern) {
+    if (!pattern.isLegal()) {
+      throw new IllegalArgumentException(String.format("Pattern \"%s\" is 
illegal.", pattern));
+    }
+
+    if (shouldExtractDeletion
+        && !(pattern instanceof IoTDBPipePattern
+            && (((IoTDBPipePattern) pattern).isPrefix()
+                || ((IoTDBPipePattern) pattern).isFullPath()))) {
+      throw new IllegalArgumentException(
+          String.format(
+              "The path pattern %s is not valid for the source. Only prefix or 
full path is allowed.",
+              pattern));
+    }
+  }
+
+  private void constructHistoricalExtractor() {
+    // Enable historical extractor by default
+    historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
+  }
+
+  private void constructRealtimeExtractor(final PipeParameters parameters)
+      throws IllegalPathException {
+    // Use heartbeat only extractor if disable realtime extractor
+    if (!parameters.getBooleanOrDefault(
+        Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
+        EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
+      realtimeExtractor = new PipeRealtimeDataRegionHeartbeatExtractor();
+      LOGGER.info(
+          "Pipe: '{}' is set to false, use heartbeat realtime extractor.",
+          EXTRACTOR_REALTIME_ENABLE_KEY);
+      return;
+    }
+
+    // Use heartbeat only extractor if enable snapshot mode
+    final String extractorModeValue =
+        parameters.getStringOrDefault(
+            Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
+    if (extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE)
+        || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)) 
{
+      realtimeExtractor = new PipeRealtimeDataRegionHeartbeatExtractor();
+      LOGGER.info(
+          "Pipe: '{}' is set to {}, use heartbeat realtime extractor.",
+          EXTRACTOR_MODE_KEY,
+          EXTRACTOR_MODE_SNAPSHOT_VALUE);
+      return;
+    }
+
+    // Use hybrid mode by default
+    if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
+      checkWalEnable(parameters);
+      realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
+      LOGGER.info(
+          "Pipe: '{}' is not set, use hybrid mode by default.", 
EXTRACTOR_REALTIME_MODE_KEY);
+      return;
+    }
+
+    switch (parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
+      case EXTRACTOR_REALTIME_MODE_FILE_VALUE:
+      case EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE:
+        realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
+        break;
+      case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
+      case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
+      case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE:
+        checkWalEnable(parameters);
+        realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
+        break;
+      case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
+        checkWalEnable(parameters);
+        realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
+        break;
+      default:
+        checkWalEnable(parameters);
+        realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
+        if (LOGGER.isWarnEnabled()) {
+          LOGGER.warn(
+              "Pipe: Unsupported extractor realtime mode: {}, create a hybrid 
extractor.",
+              parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY));
+        }
+    }
+  }
+
+  private void checkWalEnable(final PipeParameters parameters) throws 
IllegalPathException {
+    if (Boolean.TRUE.equals(
+            
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
+                .getLeft())
+        && 
IoTDBDescriptor.getInstance().getConfig().getWalMode().equals(WALMode.DISABLE)) 
{
+      throw new PipeException(
+          "The pipe cannot transfer realtime insertion if data region disables 
wal. Please set 'realtime.mode'='batch' in source parameters when enabling 
realtime transmission.");
+    }
+  }
+
+  @Override
+  public void customize(
+      final PipeParameters parameters, final PipeExtractorRuntimeConfiguration 
configuration)
+      throws Exception {
+    if (hasNoExtractionNeed) {
+      return;
+    }
+
+    final PipeTaskExtractorRuntimeEnvironment environment =
+        ((PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment());
+    regionId = environment.getRegionId();
+    pipeName = environment.getPipeName();
+    creationTime = environment.getCreationTime();
+    taskID = pipeName + "_" + regionId + "_" + creationTime;
+    pipeTaskMeta = environment.getPipeTaskMeta();
+
+    isForwardingPipeRequests =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(
+                PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+                PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
+            
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+
+    historicalExtractor.customize(parameters, configuration);
+    realtimeExtractor.customize(parameters, configuration);
+
+    // Set watermark injector
+    if (parameters.hasAnyAttributes(
+        EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) {
+      final long watermarkIntervalInMs =
+          parameters.getLongOrDefault(
+              Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, 
SOURCE_WATERMARK_INTERVAL_KEY),
+              EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE);
+      if (watermarkIntervalInMs > 0) {
+        watermarkInjector = new DataRegionWatermarkInjector(regionId, 
watermarkIntervalInMs);
+        LOGGER.info(
+            "Pipe {}@{}: Set watermark injector with interval {} ms.",
+            pipeName,
+            regionId,
+            watermarkInjector.getInjectionIntervalInMs());
+      }
+    }
+
+    // register metric after generating taskID
+    PipeDataRegionExtractorMetrics.getInstance().register(this);
+    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
+  }
+
+  @Override
+  public void start() throws Exception {
+    if (hasNoExtractionNeed || hasBeenStarted.get()) {
+      return;
+    }
+
+    final long startTime = System.currentTimeMillis();
+    LOGGER.info(
+        "Pipe {}@{}: Starting historical extractor {} and realtime extractor 
{}.",
+        pipeName,
+        regionId,
+        historicalExtractor.getClass().getSimpleName(),
+        realtimeExtractor.getClass().getSimpleName());
+
+    if (hasBeenStarted.get()) {
+      return;
+    }
+    hasBeenStarted.set(true);
+
+    final AtomicReference<Exception> exceptionHolder = new 
AtomicReference<>(null);
+    final DataRegionId dataRegionIdObject = new DataRegionId(this.regionId);
+    while (true) {
+      // try to start extractors in the data region ...
+      // first try to run if data region exists, then try to run if data 
region does not exist.
+      // both conditions fail is not common, which means the data region is 
created during the
+      // runIfPresent and runIfAbsent operations. in this case, we need to 
retry.
+      if (StorageEngine.getInstance()
+              .runIfPresent(
+                  dataRegionIdObject,
+                  (dataRegion -> {
+                    dataRegion.writeLock(
+                        String.format(
+                            "Pipe: starting %s", 
IoTDBDataRegionExtractor.class.getName()));
+                    try {
+                      
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder);
+                    } finally {
+                      dataRegion.writeUnlock();
+                    }
+                  }))
+          || StorageEngine.getInstance()
+              .runIfAbsent(
+                  dataRegionIdObject,
+                  () -> 
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder))) {
+        rethrowExceptionIfAny(exceptionHolder);
+
+        LOGGER.info(
+            "Pipe {}@{}: Started historical extractor {} and realtime 
extractor {} successfully within {} ms.",
+            pipeName,
+            regionId,
+            historicalExtractor.getClass().getSimpleName(),
+            realtimeExtractor.getClass().getSimpleName(),
+            System.currentTimeMillis() - startTime);
+        return;
+      }
+      rethrowExceptionIfAny(exceptionHolder);
+    }
+  }
+
+  private void startHistoricalExtractorAndRealtimeExtractor(
+      final AtomicReference<Exception> exceptionHolder) {
+    try {
+      // Start realtimeExtractor first to avoid losing data. This may cause 
some
+      // retransmission, yet it is OK according to the idempotency of IoTDB.
+      // Note: The order of historical collection is flushing data -> adding 
all tsFile events.
+      // There can still be writing when tsFile events are added. If we start
+      // realtimeExtractor after the process, then this part of data will be 
lost.
+      realtimeExtractor.start();
+      historicalExtractor.start();
+    } catch (final Exception e) {
+      exceptionHolder.set(e);
+      LOGGER.warn(
+          "Pipe {}@{}: Start historical extractor {} and realtime extractor {} 
error.",
+          pipeName,
+          regionId,
+          historicalExtractor.getClass().getSimpleName(),
+          realtimeExtractor.getClass().getSimpleName(),
+          e);
+    }
+  }
+
+  private void rethrowExceptionIfAny(final AtomicReference<Exception> 
exceptionHolder) {
+    if (exceptionHolder.get() != null) {
+      throw new PipeException("failed to start extractors.", 
exceptionHolder.get());
+    }
+  }
+
+  @Override
+  public Event supply() throws Exception {
+    if (hasNoExtractionNeed) {
+      return null;
+    }
+
+    Event event = null;
+    if (!historicalExtractor.hasConsumedAll()) {
+      event = historicalExtractor.supply();
+    } else {
+      if (Objects.nonNull(watermarkInjector)) {
+        event = watermarkInjector.inject();
+      }
+      if (Objects.isNull(event)) {
+        event = realtimeExtractor.supply();
+      }
+    }
+
+    if (Objects.nonNull(event)) {
+      if (event instanceof TabletInsertionEvent) {
+        PipeDataRegionExtractorMetrics.getInstance().markTabletEvent(taskID);
+      } else if (event instanceof TsFileInsertionEvent) {
+        PipeDataRegionExtractorMetrics.getInstance().markTsFileEvent(taskID);
+      } else if (event instanceof PipeHeartbeatEvent) {
+        
PipeDataRegionExtractorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
+      }
+    }
+
+    return event;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (hasNoExtractionNeed || !hasBeenStarted.get()) {
+      return;
+    }
+
+    historicalExtractor.close();
+    realtimeExtractor.close();
+    if (Objects.nonNull(taskID)) {
+      PipeDataRegionExtractorMetrics.getInstance().deregister(taskID);
+    }
+  }
+
+  //////////////////////////// APIs provided for detecting stuck 
////////////////////////////
+
+  @Override
+  public boolean shouldExtractInsertion() {
+    return shouldExtractInsertion;
+  }
+
+  @Override
+  public boolean isStreamMode() {
+    return realtimeExtractor instanceof PipeRealtimeDataRegionHybridExtractor
+        || realtimeExtractor instanceof PipeRealtimeDataRegionLogExtractor;
+  }
+
+  @Override
+  public boolean hasConsumedAllHistoricalTsFiles() {
+    return historicalExtractor.hasConsumedAll();
+  }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  @Override
+  public int getHistoricalTsFileInsertionEventCount() {
+    return hasBeenStarted.get() ? historicalExtractor.getPendingQueueSize() : 
0;
+  }
+
+  @Override
+  public int getTabletInsertionEventCount() {
+    return hasBeenStarted.get() ? 
realtimeExtractor.getTabletInsertionEventCount() : 0;
+  }
+
+  @Override
+  public int getRealtimeTsFileInsertionEventCount() {
+    return hasBeenStarted.get() ? 
realtimeExtractor.getTsFileInsertionEventCount() : 0;
+  }
+
+  @Override
+  public int getPipeHeartbeatEventCount() {
+    return hasBeenStarted.get() ? 
realtimeExtractor.getPipeHeartbeatEventCount() : 0;
+  }
+
+  @Override
+  public int getEventCount() {
+    return hasBeenStarted.get()
+        ? (historicalExtractor.getPendingQueueSize() + 
realtimeExtractor.getEventCount())
+        : 0;
+  }
+}

Reply via email to