This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-realtime-loose-range in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2db21c618973029162e36e443e77d51f8e3436e3 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Jun 17 12:52:12 2024 +0800 parameters --- .../PipeHistoricalDataRegionTsFileExtractor.java | 2 +- .../realtime/PipeRealtimeDataRegionExtractor.java | 47 ++++++++++++++++------ .../config/constant/PipeExtractorConstant.java | 3 ++ 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 237dace7c02..a0dadcfc9fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -109,8 +109,8 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time private long historicalDataExtractionTimeLowerBound; // Arrival time - private boolean sloppyPattern; private boolean sloppyTimeRange; // true to disable time range filter after extraction + private boolean sloppyPattern; // true to disable pattern filter after extraction private Pair<Boolean, Boolean> listeningOptionPair; private boolean shouldExtractInsertion; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java index c26db376da0..262f2d824aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java @@ -54,6 +54,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -61,7 +62,10 @@ import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_PATH_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_TIME_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY; @@ -102,6 +106,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { private boolean shouldTransferModFile; // Whether to transfer mods private boolean sloppyTimeRange; // true to disable time range filter after extraction + private boolean sloppyPattern; // true to disable pattern filter after extraction // This queue is used to store pending events extracted by the method extract(). The method // supply() will poll events from this queue and send them to the next pipe plugin. @@ -142,10 +147,33 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { EXTRACTOR_END_TIME_KEY, realtimeDataExtractionEndTime)); } + } catch (final PipeParameterNotValidException e) { + throw e; } catch (final Exception e) { // compatible with the current validation framework throw new PipeParameterNotValidException(e.getMessage()); } + + final Set<String> sloppyOptionSet = + Arrays.stream( + parameters + .getStringOrDefault( + Arrays.asList( + EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, SOURCE_REALTIME_LOOSE_RANGE_KEY), + EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE) + .split(",")) + .map(String::trim) + .map(String::toLowerCase) + .collect(Collectors.toSet()); + // Avoid empty string + sloppyOptionSet.remove(""); + sloppyTimeRange = sloppyOptionSet.contains(EXTRACTOR_REALTIME_LOOSE_RANGE_TIME_VALUE); + sloppyPattern = sloppyOptionSet.contains(EXTRACTOR_REALTIME_LOOSE_RANGE_PATH_VALUE); + if (!sloppyOptionSet.isEmpty()) { + throw new PipeParameterNotValidException( + String.format( + "Parameters in set %s are not allowed in 'realtime.loose-range'", sloppyOptionSet)); + } } @Override @@ -203,18 +231,13 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY), EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE || shouldExtractDeletion); - sloppyTimeRange = - Arrays.stream( - parameters - .getStringOrDefault( - Arrays.asList( - EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, SOURCE_REALTIME_LOOSE_RANGE_KEY), - "") - .split(",")) - .map(String::trim) - .map(String::toLowerCase) - .collect(Collectors.toSet()) - .contains("time"); + if (LOGGER.isInfoEnabled()) { + LOGGER.info( + "Pipe {}@{}: realtime data region extractor is initialized with parameters: {}.", + pipeName, + dataRegionId, + parameters); + } } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java index 99efcd48d61..1bf147e998b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java @@ -84,6 +84,9 @@ public class PipeExtractorConstant { public static final String EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE = "batch"; public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_KEY = "extractor.realtime.loose-range"; public static final String SOURCE_REALTIME_LOOSE_RANGE_KEY = "source.realtime.loose-range"; + public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_TIME_VALUE = "time"; + public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_PATH_VALUE = "path"; + public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE = ""; public static final String EXTRACTOR_START_TIME_KEY = "extractor.start-time"; public static final String SOURCE_START_TIME_KEY = "source.start-time";
