This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch pipe-table-model-3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-table-model-3 by this
push:
new c7e5678d29b Update IoTDBDataRegionTableModelExtractor.java
c7e5678d29b is described below
commit c7e5678d29b22b8f1a7146fd66d0d0debbc6bbb2
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Sep 26 19:09:50 2024 +0800
Update IoTDBDataRegionTableModelExtractor.java
---
.../IoTDBDataRegionTableModelExtractor.java | 119 +--------------------
1 file changed, 3 insertions(+), 116 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
index e1dd86ab9ee..8bee9a23be5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/table/extractor/dataregion/IoTDBDataRegionTableModelExtractor.java
@@ -20,8 +20,6 @@
package org.apache.iotdb.db.pipe.table.extractor.dataregion;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
@@ -42,22 +40,14 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_EXCLUSION_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE;
@@ -67,18 +57,11 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_EXCLUSION_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.datastructure.options.PipeInclusionOptions.hasAtLeastOneOption;
import static
org.apache.iotdb.commons.pipe.datastructure.options.PipeInclusionOptions.optionsAreAllLegal;
@@ -139,91 +122,11 @@ public class IoTDBDataRegionTableModelExtractor extends
IoTDBDataRegionExtractor
throw new PipeException(
"The pipe cannot transfer data when data region is using ratis
consensus.");
}
-
- // Validate extractor.pattern.format is within valid range
- validator
- .validateAttributeValueRange(
- EXTRACTOR_PATTERN_FORMAT_KEY,
- true,
- EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE,
- EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE)
- .validateAttributeValueRange(
- SOURCE_PATTERN_FORMAT_KEY,
- true,
- EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE,
- EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE);
-
- // Get the pattern format to check whether the pattern is legal
- final PipePattern pattern =
-
PipePattern.parsePipePatternFromSourceParameters(validator.getParameters());
-
- // Check whether the pattern is legal
- validatePattern(pattern);
-
- // Validate extractor.history.enable and extractor.realtime.enable
- validator
- .validateAttributeValueRange(
- EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
- .validateAttributeValueRange(
- EXTRACTOR_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
- .validateAttributeValueRange(
- SOURCE_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
- .validateAttributeValueRange(
- SOURCE_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
- .validate(
- args -> (boolean) args[0] || (boolean) args[1],
- "Should not set both history.enable and realtime.enable to false.",
- validator
- .getParameters()
- .getBooleanOrDefault(
- Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
- EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE),
- validator
- .getParameters()
- .getBooleanOrDefault(
- Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
- EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
-
- // Validate extractor.realtime.mode
- if (validator
- .getParameters()
- .getBooleanOrDefault(
- Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
- EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)
- || validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY,
SOURCE_END_TIME_KEY)) {
- validator.validateAttributeValueRange(
- validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY)
- ? EXTRACTOR_REALTIME_MODE_KEY
- : SOURCE_REALTIME_MODE_KEY,
- true,
- EXTRACTOR_REALTIME_MODE_FILE_VALUE,
- EXTRACTOR_REALTIME_MODE_HYBRID_VALUE,
- EXTRACTOR_REALTIME_MODE_LOG_VALUE,
- EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE,
- EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE,
- EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE);
+ if (insertionDeletionListeningOptionPair.getRight().equals(true)) {
+ throw new PipeException("Table model extractor does not support deletion
listening.");
}
- // Validate source.start-time and source.end-time
- if (validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY,
SOURCE_END_TIME_KEY)
- && validator
- .getParameters()
- .hasAnyAttributes(
- EXTRACTOR_HISTORY_ENABLE_KEY,
- EXTRACTOR_REALTIME_ENABLE_KEY,
- SOURCE_HISTORY_ENABLE_KEY,
- SOURCE_REALTIME_ENABLE_KEY)) {
- LOGGER.warn(
- "When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is
invalid.",
- SOURCE_START_TIME_KEY,
- EXTRACTOR_START_TIME_KEY,
- SOURCE_END_TIME_KEY,
- EXTRACTOR_END_TIME_KEY,
- SOURCE_HISTORY_START_TIME_KEY,
- EXTRACTOR_HISTORY_START_TIME_KEY,
- SOURCE_HISTORY_END_TIME_KEY,
- EXTRACTOR_HISTORY_END_TIME_KEY);
- }
+ // TODO: SQL matcher construction and validation
constructHistoricalExtractor();
constructRealtimeExtractor(validator.getParameters());
@@ -232,22 +135,6 @@ public class IoTDBDataRegionTableModelExtractor extends
IoTDBDataRegionExtractor
realtimeExtractor.validate(validator);
}
- private void validatePattern(final PipePattern pattern) {
- if (!pattern.isLegal()) {
- throw new IllegalArgumentException(String.format("Pattern \"%s\" is
illegal.", pattern));
- }
-
- if (shouldExtractDeletion
- && !(pattern instanceof IoTDBPipePattern
- && (((IoTDBPipePattern) pattern).isPrefix()
- || ((IoTDBPipePattern) pattern).isFullPath()))) {
- throw new IllegalArgumentException(
- String.format(
- "The path pattern %s is not valid for the source. Only prefix or
full path is allowed.",
- pattern));
- }
- }
-
private void constructHistoricalExtractor() {
// Enable historical extractor by default
historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();