This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-realtime-loose-range
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2db21c618973029162e36e443e77d51f8e3436e3
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jun 17 12:52:12 2024 +0800

    parameters
---
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  2 +-
 .../realtime/PipeRealtimeDataRegionExtractor.java  | 47 ++++++++++++++++------
 .../config/constant/PipeExtractorConstant.java     |  3 ++
 3 files changed, 39 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 237dace7c02..a0dadcfc9fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -109,8 +109,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time
   private long historicalDataExtractionTimeLowerBound; // Arrival time
 
-  private boolean sloppyPattern;
   private boolean sloppyTimeRange; // true to disable time range filter after 
extraction
+  private boolean sloppyPattern; // true to disable pattern filter after 
extraction
 
   private Pair<Boolean, Boolean> listeningOptionPair;
   private boolean shouldExtractInsertion;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index c26db376da0..262f2d824aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -54,6 +54,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -61,7 +62,10 @@ import java.util.stream.Collectors;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_PATH_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_TIME_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
@@ -102,6 +106,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
   private boolean shouldTransferModFile; // Whether to transfer mods
 
   private boolean sloppyTimeRange; // true to disable time range filter after 
extraction
+  private boolean sloppyPattern; // true to disable pattern filter after 
extraction
 
   // This queue is used to store pending events extracted by the method 
extract(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
@@ -142,10 +147,33 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
                 EXTRACTOR_END_TIME_KEY,
                 realtimeDataExtractionEndTime));
       }
+    } catch (final PipeParameterNotValidException e) {
+      throw e;
     } catch (final Exception e) {
       // compatible with the current validation framework
       throw new PipeParameterNotValidException(e.getMessage());
     }
+
+    final Set<String> sloppyOptionSet =
+        Arrays.stream(
+                parameters
+                    .getStringOrDefault(
+                        Arrays.asList(
+                            EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, 
SOURCE_REALTIME_LOOSE_RANGE_KEY),
+                        EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE)
+                    .split(","))
+            .map(String::trim)
+            .map(String::toLowerCase)
+            .collect(Collectors.toSet());
+    // Avoid empty string
+    sloppyOptionSet.remove("");
+    sloppyTimeRange = 
sloppyOptionSet.contains(EXTRACTOR_REALTIME_LOOSE_RANGE_TIME_VALUE);
+    sloppyPattern = 
sloppyOptionSet.contains(EXTRACTOR_REALTIME_LOOSE_RANGE_PATH_VALUE);
+    if (!sloppyOptionSet.isEmpty()) {
+      throw new PipeParameterNotValidException(
+          String.format(
+              "Parameters in set %s are not allowed in 
'realtime.loose-range'", sloppyOptionSet));
+    }
   }
 
   @Override
@@ -203,18 +231,13 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
             Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY),
             EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE || shouldExtractDeletion);
 
-    sloppyTimeRange =
-        Arrays.stream(
-                parameters
-                    .getStringOrDefault(
-                        Arrays.asList(
-                            EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, 
SOURCE_REALTIME_LOOSE_RANGE_KEY),
-                        "")
-                    .split(","))
-            .map(String::trim)
-            .map(String::toLowerCase)
-            .collect(Collectors.toSet())
-            .contains("time");
+    if (LOGGER.isInfoEnabled()) {
+      LOGGER.info(
+          "Pipe {}@{}: realtime data region extractor is initialized with 
parameters: {}.",
+          pipeName,
+          dataRegionId,
+          parameters);
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 99efcd48d61..1bf147e998b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -84,6 +84,9 @@ public class PipeExtractorConstant {
   public static final String EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE = 
"batch";
   public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_KEY = 
"extractor.realtime.loose-range";
   public static final String SOURCE_REALTIME_LOOSE_RANGE_KEY = 
"source.realtime.loose-range";
+  public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_TIME_VALUE = 
"time";
+  public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_PATH_VALUE = 
"path";
+  public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE = "";
 
   public static final String EXTRACTOR_START_TIME_KEY = "extractor.start-time";
   public static final String SOURCE_START_TIME_KEY = "source.start-time";

Reply via email to