This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch pipe-table-model-3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-table-model-3 by this
push:
new 9048640b55e IoTDBDataRegionTableModelExtractor impl the same with
IoTDBDataRegionExtractor
9048640b55e is described below
commit 9048640b55e974aa13e8fc66fe0d11dbca316b94
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Sep 26 18:10:38 2024 +0800
IoTDBDataRegionTableModelExtractor impl the same with
IoTDBDataRegionExtractor
---
.../dataregion/IoTDBDataRegionExtractor.java | 12 +-
.../IoTDBDataRegionTableModelExtractor.java | 547 ++++++++++++++++++++-
2 files changed, 552 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index f05bcf2bea5..7a3397c9ef4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -93,14 +93,14 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataRegionExtractor.class);
- private PipeHistoricalDataRegionExtractor historicalExtractor;
- private PipeRealtimeDataRegionExtractor realtimeExtractor;
+ protected PipeHistoricalDataRegionExtractor historicalExtractor;
+ protected PipeRealtimeDataRegionExtractor realtimeExtractor;
- private DataRegionWatermarkInjector watermarkInjector;
+ protected DataRegionWatermarkInjector watermarkInjector;
- private boolean hasNoExtractionNeed = true;
- private boolean shouldExtractInsertion = false;
- private boolean shouldExtractDeletion = false;
+ protected boolean hasNoExtractionNeed = true;
+ protected boolean shouldExtractInsertion = false;
+ protected boolean shouldExtractDeletion = false;
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
index a8c35b78333..fa23d12a2e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
@@ -19,6 +19,551 @@
package org.apache.iotdb.db.pipe.table.extractor.dataregion;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
+import
org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionWatermarkInjector;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
+import
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionTsFileExtractor;
+import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHeartbeatExtractor;
+import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
+import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
+import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
+import
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
-public class IoTDBDataRegionTableModelExtractor extends
IoTDBDataRegionExtractor {}
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_EXCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_INCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_EXCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_INCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY;
+import static
org.apache.iotdb.commons.pipe.datastructure.options.PipeInclusionOptions.hasAtLeastOneOption;
+import static
org.apache.iotdb.commons.pipe.datastructure.options.PipeInclusionOptions.optionsAreAllLegal;
+
+public class IoTDBDataRegionTableModelExtractor extends
IoTDBDataRegionExtractor {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBDataRegionTableModelExtractor.class);
+
+ @Override
+ public void validate(final PipeParameterValidator validator) throws
Exception {
+ validator
+ .validate(
+ args -> optionsAreAllLegal((String) args),
+ "The 'inclusion' string contains illegal path.",
+ validator
+ .getParameters()
+ .getStringOrDefault(
+ Arrays.asList(EXTRACTOR_INCLUSION_KEY,
SOURCE_INCLUSION_KEY),
+ EXTRACTOR_INCLUSION_DEFAULT_VALUE))
+ .validate(
+ args -> optionsAreAllLegal((String) args),
+ "The 'inclusion.exclusion' string contains illegal path.",
+ validator
+ .getParameters()
+ .getStringOrDefault(
+ Arrays.asList(EXTRACTOR_EXCLUSION_KEY,
SOURCE_EXCLUSION_KEY),
+ EXTRACTOR_EXCLUSION_DEFAULT_VALUE))
+ .validate(
+ args -> hasAtLeastOneOption((String) args[0], (String) args[1]),
+ "The pipe inclusion content can't be empty.",
+ validator
+ .getParameters()
+ .getStringOrDefault(
+ Arrays.asList(EXTRACTOR_INCLUSION_KEY,
SOURCE_INCLUSION_KEY),
+ EXTRACTOR_INCLUSION_DEFAULT_VALUE),
+ validator
+ .getParameters()
+ .getStringOrDefault(
+ Arrays.asList(EXTRACTOR_EXCLUSION_KEY,
SOURCE_EXCLUSION_KEY),
+ EXTRACTOR_EXCLUSION_DEFAULT_VALUE));
+
+ final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
+ DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
+ validator.getParameters());
+ if (insertionDeletionListeningOptionPair.getLeft().equals(false)
+ && insertionDeletionListeningOptionPair.getRight().equals(false)) {
+ return;
+ }
+ hasNoExtractionNeed = false;
+ shouldExtractInsertion = insertionDeletionListeningOptionPair.getLeft();
+ shouldExtractDeletion = insertionDeletionListeningOptionPair.getRight();
+
+ if (insertionDeletionListeningOptionPair.getLeft().equals(true)
+ && IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.RATIS_CONSENSUS)) {
+ throw new PipeException(
+ "The pipe cannot transfer data when data region is using ratis
consensus.");
+ }
+
+ // Validate extractor.pattern.format is within valid range
+ validator
+ .validateAttributeValueRange(
+ EXTRACTOR_PATTERN_FORMAT_KEY,
+ true,
+ EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE,
+ EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE)
+ .validateAttributeValueRange(
+ SOURCE_PATTERN_FORMAT_KEY,
+ true,
+ EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE,
+ EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE);
+
+ // Get the pattern format to check whether the pattern is legal
+ final PipePattern pattern =
+
PipePattern.parsePipePatternFromSourceParameters(validator.getParameters());
+
+ // Check whether the pattern is legal
+ validatePattern(pattern);
+
+ // Validate extractor.history.enable and extractor.realtime.enable
+ validator
+ .validateAttributeValueRange(
+ EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
+ .validateAttributeValueRange(
+ EXTRACTOR_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
+ .validateAttributeValueRange(
+ SOURCE_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
+ .validateAttributeValueRange(
+ SOURCE_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
+ .validate(
+ args -> (boolean) args[0] || (boolean) args[1],
+ "Should not set both history.enable and realtime.enable to false.",
+ validator
+ .getParameters()
+ .getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
+ EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE),
+ validator
+ .getParameters()
+ .getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
+ EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
+
+ // Validate extractor.realtime.mode
+ if (validator
+ .getParameters()
+ .getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
+ EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)
+ || validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY,
SOURCE_END_TIME_KEY)) {
+ validator.validateAttributeValueRange(
+ validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY)
+ ? EXTRACTOR_REALTIME_MODE_KEY
+ : SOURCE_REALTIME_MODE_KEY,
+ true,
+ EXTRACTOR_REALTIME_MODE_FILE_VALUE,
+ EXTRACTOR_REALTIME_MODE_HYBRID_VALUE,
+ EXTRACTOR_REALTIME_MODE_LOG_VALUE,
+ EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE,
+ EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE,
+ EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE);
+ }
+
+ // Validate source.start-time and source.end-time
+ if (validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY,
SOURCE_END_TIME_KEY)
+ && validator
+ .getParameters()
+ .hasAnyAttributes(
+ EXTRACTOR_HISTORY_ENABLE_KEY,
+ EXTRACTOR_REALTIME_ENABLE_KEY,
+ SOURCE_HISTORY_ENABLE_KEY,
+ SOURCE_REALTIME_ENABLE_KEY)) {
+ LOGGER.warn(
+ "When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is
invalid.",
+ SOURCE_START_TIME_KEY,
+ EXTRACTOR_START_TIME_KEY,
+ SOURCE_END_TIME_KEY,
+ EXTRACTOR_END_TIME_KEY,
+ SOURCE_HISTORY_START_TIME_KEY,
+ EXTRACTOR_HISTORY_START_TIME_KEY,
+ SOURCE_HISTORY_END_TIME_KEY,
+ EXTRACTOR_HISTORY_END_TIME_KEY);
+ }
+
+ constructHistoricalExtractor();
+ constructRealtimeExtractor(validator.getParameters());
+
+ historicalExtractor.validate(validator);
+ realtimeExtractor.validate(validator);
+ }
+
+ private void validatePattern(final PipePattern pattern) {
+ if (!pattern.isLegal()) {
+ throw new IllegalArgumentException(String.format("Pattern \"%s\" is
illegal.", pattern));
+ }
+
+ if (shouldExtractDeletion
+ && !(pattern instanceof IoTDBPipePattern
+ && (((IoTDBPipePattern) pattern).isPrefix()
+ || ((IoTDBPipePattern) pattern).isFullPath()))) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The path pattern %s is not valid for the source. Only prefix or
full path is allowed.",
+ pattern));
+ }
+ }
+
+ private void constructHistoricalExtractor() {
+ // Enable historical extractor by default
+ historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
+ }
+
+ private void constructRealtimeExtractor(final PipeParameters parameters)
+ throws IllegalPathException {
+ // Use heartbeat only extractor if disable realtime extractor
+ if (!parameters.getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
+ EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
+ realtimeExtractor = new PipeRealtimeDataRegionHeartbeatExtractor();
+ LOGGER.info(
+ "Pipe: '{}' is set to false, use heartbeat realtime extractor.",
+ EXTRACTOR_REALTIME_ENABLE_KEY);
+ return;
+ }
+
+ // Use heartbeat only extractor if enable snapshot mode
+ final String extractorModeValue =
+ parameters.getStringOrDefault(
+ Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
EXTRACTOR_MODE_DEFAULT_VALUE);
+ if (extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE)
+ || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE))
{
+ realtimeExtractor = new PipeRealtimeDataRegionHeartbeatExtractor();
+ LOGGER.info(
+ "Pipe: '{}' is set to {}, use heartbeat realtime extractor.",
+ EXTRACTOR_MODE_KEY,
+ EXTRACTOR_MODE_SNAPSHOT_VALUE);
+ return;
+ }
+
+ // Use hybrid mode by default
+ if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
+ checkWalEnable(parameters);
+ realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
+ LOGGER.info(
+ "Pipe: '{}' is not set, use hybrid mode by default.",
EXTRACTOR_REALTIME_MODE_KEY);
+ return;
+ }
+
+ switch (parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
+ case EXTRACTOR_REALTIME_MODE_FILE_VALUE:
+ case EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE:
+ realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
+ break;
+ case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
+ case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
+ case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE:
+ checkWalEnable(parameters);
+ realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
+ break;
+ case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
+ checkWalEnable(parameters);
+ realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
+ break;
+ default:
+ checkWalEnable(parameters);
+ realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
+ if (LOGGER.isWarnEnabled()) {
+ LOGGER.warn(
+ "Pipe: Unsupported extractor realtime mode: {}, create a hybrid
extractor.",
+ parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY));
+ }
+ }
+ }
+
+ private void checkWalEnable(final PipeParameters parameters) throws
IllegalPathException {
+ if (Boolean.TRUE.equals(
+
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
+ .getLeft())
+ &&
IoTDBDescriptor.getInstance().getConfig().getWalMode().equals(WALMode.DISABLE))
{
+ throw new PipeException(
+ "The pipe cannot transfer realtime insertion if data region disables
wal. Please set 'realtime.mode'='batch' in source parameters when enabling
realtime transmission.");
+ }
+ }
+
+ @Override
+ public void customize(
+ final PipeParameters parameters, final PipeExtractorRuntimeConfiguration
configuration)
+ throws Exception {
+ if (hasNoExtractionNeed) {
+ return;
+ }
+
+ final PipeTaskExtractorRuntimeEnvironment environment =
+ ((PipeTaskExtractorRuntimeEnvironment)
configuration.getRuntimeEnvironment());
+ regionId = environment.getRegionId();
+ pipeName = environment.getPipeName();
+ creationTime = environment.getCreationTime();
+ taskID = pipeName + "_" + regionId + "_" + creationTime;
+ pipeTaskMeta = environment.getPipeTaskMeta();
+
+ isForwardingPipeRequests =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+ PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
+
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+
+ historicalExtractor.customize(parameters, configuration);
+ realtimeExtractor.customize(parameters, configuration);
+
+ // Set watermark injector
+ if (parameters.hasAnyAttributes(
+ EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) {
+ final long watermarkIntervalInMs =
+ parameters.getLongOrDefault(
+ Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY,
SOURCE_WATERMARK_INTERVAL_KEY),
+ EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE);
+ if (watermarkIntervalInMs > 0) {
+ watermarkInjector = new DataRegionWatermarkInjector(regionId,
watermarkIntervalInMs);
+ LOGGER.info(
+ "Pipe {}@{}: Set watermark injector with interval {} ms.",
+ pipeName,
+ regionId,
+ watermarkInjector.getInjectionIntervalInMs());
+ }
+ }
+
+ // register metric after generating taskID
+ PipeDataRegionExtractorMetrics.getInstance().register(this);
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
+ }
+
+ @Override
+ public void start() throws Exception {
+ if (hasNoExtractionNeed || hasBeenStarted.get()) {
+ return;
+ }
+
+ final long startTime = System.currentTimeMillis();
+ LOGGER.info(
+ "Pipe {}@{}: Starting historical extractor {} and realtime extractor
{}.",
+ pipeName,
+ regionId,
+ historicalExtractor.getClass().getSimpleName(),
+ realtimeExtractor.getClass().getSimpleName());
+
+ if (hasBeenStarted.get()) {
+ return;
+ }
+ hasBeenStarted.set(true);
+
+ final AtomicReference<Exception> exceptionHolder = new
AtomicReference<>(null);
+ final DataRegionId dataRegionIdObject = new DataRegionId(this.regionId);
+ while (true) {
+ // try to start extractors in the data region ...
+ // first try to run if data region exists, then try to run if data
region does not exist.
+ // both conditions fail is not common, which means the data region is
created during the
+ // runIfPresent and runIfAbsent operations. in this case, we need to
retry.
+ if (StorageEngine.getInstance()
+ .runIfPresent(
+ dataRegionIdObject,
+ (dataRegion -> {
+ dataRegion.writeLock(
+ String.format(
+ "Pipe: starting %s",
IoTDBDataRegionExtractor.class.getName()));
+ try {
+
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder);
+ } finally {
+ dataRegion.writeUnlock();
+ }
+ }))
+ || StorageEngine.getInstance()
+ .runIfAbsent(
+ dataRegionIdObject,
+ () ->
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder))) {
+ rethrowExceptionIfAny(exceptionHolder);
+
+ LOGGER.info(
+ "Pipe {}@{}: Started historical extractor {} and realtime
extractor {} successfully within {} ms.",
+ pipeName,
+ regionId,
+ historicalExtractor.getClass().getSimpleName(),
+ realtimeExtractor.getClass().getSimpleName(),
+ System.currentTimeMillis() - startTime);
+ return;
+ }
+ rethrowExceptionIfAny(exceptionHolder);
+ }
+ }
+
+ private void startHistoricalExtractorAndRealtimeExtractor(
+ final AtomicReference<Exception> exceptionHolder) {
+ try {
+ // Start realtimeExtractor first to avoid losing data. This may cause
some
+ // retransmission, yet it is OK according to the idempotency of IoTDB.
+ // Note: The order of historical collection is flushing data -> adding
all tsFile events.
+ // There can still be writing when tsFile events are added. If we start
+ // realtimeExtractor after the process, then this part of data will be
lost.
+ realtimeExtractor.start();
+ historicalExtractor.start();
+ } catch (final Exception e) {
+ exceptionHolder.set(e);
+ LOGGER.warn(
+ "Pipe {}@{}: Start historical extractor {} and realtime extractor {}
error.",
+ pipeName,
+ regionId,
+ historicalExtractor.getClass().getSimpleName(),
+ realtimeExtractor.getClass().getSimpleName(),
+ e);
+ }
+ }
+
+ private void rethrowExceptionIfAny(final AtomicReference<Exception>
exceptionHolder) {
+ if (exceptionHolder.get() != null) {
+ throw new PipeException("failed to start extractors.",
exceptionHolder.get());
+ }
+ }
+
+ @Override
+ public Event supply() throws Exception {
+ if (hasNoExtractionNeed) {
+ return null;
+ }
+
+ Event event = null;
+ if (!historicalExtractor.hasConsumedAll()) {
+ event = historicalExtractor.supply();
+ } else {
+ if (Objects.nonNull(watermarkInjector)) {
+ event = watermarkInjector.inject();
+ }
+ if (Objects.isNull(event)) {
+ event = realtimeExtractor.supply();
+ }
+ }
+
+ if (Objects.nonNull(event)) {
+ if (event instanceof TabletInsertionEvent) {
+ PipeDataRegionExtractorMetrics.getInstance().markTabletEvent(taskID);
+ } else if (event instanceof TsFileInsertionEvent) {
+ PipeDataRegionExtractorMetrics.getInstance().markTsFileEvent(taskID);
+ } else if (event instanceof PipeHeartbeatEvent) {
+
PipeDataRegionExtractorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
+ }
+ }
+
+ return event;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (hasNoExtractionNeed || !hasBeenStarted.get()) {
+ return;
+ }
+
+ historicalExtractor.close();
+ realtimeExtractor.close();
+ if (Objects.nonNull(taskID)) {
+ PipeDataRegionExtractorMetrics.getInstance().deregister(taskID);
+ }
+ }
+
+ //////////////////////////// APIs provided for detecting stuck
////////////////////////////
+
+ @Override
+ public boolean shouldExtractInsertion() {
+ return shouldExtractInsertion;
+ }
+
+ @Override
+ public boolean isStreamMode() {
+ return realtimeExtractor instanceof PipeRealtimeDataRegionHybridExtractor
+ || realtimeExtractor instanceof PipeRealtimeDataRegionLogExtractor;
+ }
+
+ @Override
+ public boolean hasConsumedAllHistoricalTsFiles() {
+ return historicalExtractor.hasConsumedAll();
+ }
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ @Override
+ public int getHistoricalTsFileInsertionEventCount() {
+ return hasBeenStarted.get() ? historicalExtractor.getPendingQueueSize() :
0;
+ }
+
+ @Override
+ public int getTabletInsertionEventCount() {
+ return hasBeenStarted.get() ?
realtimeExtractor.getTabletInsertionEventCount() : 0;
+ }
+
+ @Override
+ public int getRealtimeTsFileInsertionEventCount() {
+ return hasBeenStarted.get() ?
realtimeExtractor.getTsFileInsertionEventCount() : 0;
+ }
+
+ @Override
+ public int getPipeHeartbeatEventCount() {
+ return hasBeenStarted.get() ?
realtimeExtractor.getPipeHeartbeatEventCount() : 0;
+ }
+
+ @Override
+ public int getEventCount() {
+ return hasBeenStarted.get()
+ ? (historicalExtractor.getPendingQueueSize() +
realtimeExtractor.getEventCount())
+ : 0;
+ }
+}