This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cd5ecde5f2 Pipe: Support "source.realtime.loose-range" = "path" in 
iotdb-source (#12751)
1cd5ecde5f2 is described below

commit 1cd5ecde5f2fd060d9282b6d93331956a55631c0
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Jun 18 16:54:50 2024 +0800

    Pipe: Support "source.realtime.loose-range" = "path" in iotdb-source 
(#12751)
---
 .../pipe/it/autocreate/IoTDBPipeExtractorIT.java   | 60 ++++++++++++++-
 .../db/pipe/event/UserDefinedEnrichedEvent.java    |  5 ++
 .../event/common/heartbeat/PipeHeartbeatEvent.java |  5 ++
 .../tablet/PipeInsertNodeTabletInsertionEvent.java | 86 ++++++++++++++--------
 .../common/tablet/PipeRawTabletInsertionEvent.java |  6 ++
 .../event/common/terminate/PipeTerminateEvent.java |  5 ++
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 47 +++++++-----
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  | 29 +++++---
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  7 +-
 .../realtime/PipeRealtimeDataRegionExtractor.java  | 71 +++++++++++++-----
 .../pattern/CachedSchemaPatternMatcherTest.java    | 30 +++++++-
 .../config/constant/PipeExtractorConstant.java     |  3 +
 .../iotdb/commons/pipe/event/EnrichedEvent.java    |  2 +
 .../commons/pipe/event/PipeSnapshotEvent.java      |  5 ++
 .../commons/pipe/event/PipeWritePlanEvent.java     |  5 ++
 .../commons/pipe/event/ProgressReportEvent.java    |  5 ++
 16 files changed, 288 insertions(+), 83 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index f83699f30b7..8ce885935b8 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -904,7 +904,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
   }
 
   @Test
-  public void testLooseRange() throws Exception {
+  public void testHistoryLooseRange() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     final String receiverIp = receiverDataNode.getIp();
@@ -964,6 +964,64 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
     }
   }
 
+  @Test
+  public void testRealtimeLooseRange() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    final Map<String, String> extractorAttributes = new HashMap<>();
+    final Map<String, String> processorAttributes = new HashMap<>();
+    final Map<String, String> connectorAttributes = new HashMap<>();
+
+    extractorAttributes.put("source.path", "root.db.d1.at1");
+    extractorAttributes.put("source.inclusion", "data.insert");
+    extractorAttributes.put("source.realtime.loose-range", "time, path");
+    extractorAttributes.put("source.start-time", "2000");
+    extractorAttributes.put("source.end-time", "10000");
+    extractorAttributes.put("source.realtime.mode", "batch");
+
+    connectorAttributes.put("connector", "iotdb-thrift-connector");
+    connectorAttributes.put("connector.batch.enable", "false");
+    connectorAttributes.put("connector.ip", receiverIp);
+    connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
+
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1 (time, at1, at2)" + " values (1000, 1, 
2), (3000, 3, 4)",
+              "flush"))) {
+        return;
+      }
+
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db1.d1 (time, at1, at2)" + " values (1000, 1, 
2), (3000, 3, 4)",
+              "flush"))) {
+        return;
+      }
+
+      TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1 (time, at1)" + " values (5000, 1), 
(16000, 3)",
+              "insert into root.db.d1 (time, at1, at2)" + " values (5001, 1, 
2), (6001, 3, 4)",
+              "flush"));
+    }
+  }
+
   private void assertTimeseriesCountOnReceiver(BaseEnv receiverEnv, int count) 
{
     TestUtils.assertDataEventuallyOnEnv(
         receiverEnv, "count timeseries", "count(timeseries),", 
Collections.singleton(count + ","));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
index 4803158db2e..82829a4542b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
@@ -92,4 +92,9 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
   public boolean mayEventTimeOverlappedWithTimeRange() {
     return enrichedEvent.mayEventTimeOverlappedWithTimeRange();
   }
+
+  @Override
+  public boolean mayEventPathsOverlappedWithPattern() {
+    return enrichedEvent.mayEventPathsOverlappedWithPattern();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index f82d799b7d3..d3830b6dcf0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -121,6 +121,11 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
     return true;
   }
 
+  @Override
+  public boolean mayEventPathsOverlappedWithPattern() {
+    return true;
+  }
+
   /////////////////////////////// Whether to print 
///////////////////////////////
 
   public boolean isShouldPrintMessage() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index b9ddfd0d55d..abb0fcb572c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -205,29 +204,72 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   @Override
   public boolean mayEventTimeOverlappedWithTimeRange() {
     try {
-      final InsertNode insertNode = getInsertNode();
+      final InsertNode insertNode = getInsertNodeViaCacheIfPossible();
+      if (Objects.isNull(insertNode)) {
+        return true;
+      }
+
       if (insertNode instanceof InsertRowNode) {
         final long timestamp = ((InsertRowNode) insertNode).getTime();
         return startTime <= timestamp && timestamp <= endTime;
-      } else if (insertNode instanceof InsertTabletNode) {
+      }
+
+      if (insertNode instanceof InsertTabletNode) {
         final long[] timestamps = ((InsertTabletNode) insertNode).getTimes();
         if (Objects.isNull(timestamps) || timestamps.length == 0) {
           return false;
         }
         // We assume that `timestamps` is ordered.
         return startTime <= timestamps[timestamps.length - 1] && timestamps[0] 
<= endTime;
-      } else if (insertNode instanceof InsertRowsNode) {
-        for (final InsertRowNode node : ((InsertRowsNode) 
insertNode).getInsertRowNodeList()) {
-          final long timestamp = node.getTime();
-          if (startTime <= timestamp && timestamp <= endTime) {
-            return true;
-          }
-        }
-        return false;
-      } else {
-        throw new UnSupportedDataTypeException(
-            String.format("InsertNode type %s is not supported.", 
insertNode.getClass().getName()));
       }
+
+      if (insertNode instanceof InsertRowsNode) {
+        return ((InsertRowsNode) insertNode)
+            .getInsertRowNodeList().stream()
+                .anyMatch(
+                    insertRowNode -> {
+                      final long timestamp = insertRowNode.getTime();
+                      return startTime <= timestamp && timestamp <= endTime;
+                    });
+      }
+
+      return true;
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Exception occurred when determining the event time of 
PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: [{}, {}]. 
Returning true to ensure data integrity.",
+          this,
+          startTime,
+          endTime,
+          e);
+      return true;
+    }
+  }
+
+  @Override
+  public boolean mayEventPathsOverlappedWithPattern() {
+    try {
+      final InsertNode insertNode = getInsertNodeViaCacheIfPossible();
+      if (Objects.isNull(insertNode)) {
+        return true;
+      }
+
+      if (insertNode instanceof InsertRowNode || insertNode instanceof 
InsertTabletNode) {
+        final PartialPath devicePartialPath = insertNode.getDevicePath();
+        return Objects.isNull(devicePartialPath)
+            || 
pipePattern.mayOverlapWithDevice(devicePartialPath.getFullPath());
+      }
+
+      if (insertNode instanceof InsertRowsNode) {
+        return ((InsertRowsNode) insertNode)
+            .getInsertRowNodeList().stream()
+                .anyMatch(
+                    insertRowNode ->
+                        Objects.isNull(insertRowNode.getDevicePath())
+                            || pipePattern.mayOverlapWithDevice(
+                                insertRowNode.getDevicePath().getFullPath()));
+      }
+
+      return true;
     } catch (final Exception e) {
       LOGGER.warn(
           "Exception occurred when determining the event time of 
PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: [{}, {}]. 
Returning true to ensure data integrity.",
@@ -318,22 +360,6 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   /////////////////////////// parsePatternOrTime ///////////////////////////
 
-  @Override
-  public boolean shouldParsePattern() {
-    final InsertNode node = getInsertNodeViaCacheIfPossible();
-    return super.shouldParsePattern()
-        && Objects.nonNull(pipePattern)
-        && (Objects.isNull(node)
-            || (node.getType() == PlanNodeType.INSERT_ROWS
-                ? ((InsertRowsNode) node)
-                    .getInsertRowNodeList().stream()
-                        .anyMatch(
-                            insertRowNode ->
-                                !pipePattern.coversDevice(
-                                    
insertRowNode.getDevicePath().getFullPath()))
-                : 
!pipePattern.coversDevice(node.getDevicePath().getFullPath())));
-  }
-
   public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
     final List<PipeRawTabletInsertionEvent> events =
         convertToTablets().stream()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 239f73b3c32..9769299ae68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -191,6 +191,12 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
     return startTime <= timestamps[timestamps.length - 1] && timestamps[0] <= 
endTime;
   }
 
+  @Override
+  public boolean mayEventPathsOverlappedWithPattern() {
+    final String deviceId = getDeviceId();
+    return Objects.isNull(deviceId) || 
pipePattern.mayOverlapWithDevice(deviceId);
+  }
+
   public void markAsNeedToReport() {
     this.needToReport = true;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index cc423f1bf3d..146873ed3d2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -84,6 +84,11 @@ public class PipeTerminateEvent extends EnrichedEvent {
     return true;
   }
 
+  @Override
+  public boolean mayEventPathsOverlappedWithPattern() {
+    return true;
+  }
+
   @Override
   public void reportProgress() {
     PipeAgent.task().markCompleted(pipeName, dataRegionId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index a7c0cc5c255..b7a883712df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -33,12 +34,17 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class PipeTsFileInsertionEvent extends EnrichedEvent implements 
TsFileInsertionEvent {
@@ -277,28 +283,35 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
         : resource.getFileStartTime() <= endTime;
   }
 
-  /////////////////////////// TsFileInsertionEvent ///////////////////////////
-
   @Override
-  public boolean shouldParseTimeOrPattern() {
-    boolean shouldParseTimeOrPattern = false;
+  public boolean mayEventPathsOverlappedWithPattern() {
+    if (!resource.isClosed()) {
+      return true;
+    }
+
     try {
-      shouldParseTimeOrPattern = super.shouldParseTimeOrPattern();
-      return shouldParseTimeOrPattern;
-    } finally {
-      // Super method will call shouldParsePattern() and then init 
dataContainer at
-      // shouldParsePattern(). If shouldParsePattern() returns false, 
dataContainer will
-      // not be used, so we need to close the resource here.
-      if (!shouldParseTimeOrPattern) {
-        close();
-      }
+      final Map<IDeviceID, Boolean> deviceIsAlignedMap =
+          PipeResourceManager.tsfile()
+              .getDeviceIsAlignedMapFromCache(
+                  
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+      final Set<IDeviceID> deviceSet =
+          Objects.nonNull(deviceIsAlignedMap) ? deviceIsAlignedMap.keySet() : 
resource.getDevices();
+      return deviceSet.stream()
+          .anyMatch(
+              // TODO: use IDeviceID
+              deviceID ->
+                  pipePattern.mayOverlapWithDevice(((PlainDeviceID) 
deviceID).toStringID()));
+    } catch (final IOException e) {
+      LOGGER.warn(
+          "Pipe {}: failed to get devices from TsFile {}, extract it anyway",
+          pipeName,
+          resource.getTsFilePath(),
+          e);
+      return true;
     }
   }
 
-  @Override
-  public boolean shouldParsePattern() {
-    return super.shouldParsePattern() && 
initDataContainer().shouldParsePattern();
-  }
+  /////////////////////////// TsFileInsertionEvent ///////////////////////////
 
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index ce8e7d9f0fa..7e12a909b1f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -129,10 +129,6 @@ public class PipeRealtimeEvent extends EnrichedEvent {
     return event.getProgressIndex();
   }
 
-  /**
-   * If pipe's pattern is database-level, then no need to parse event by 
pattern cause pipes are
-   * data-region-level.
-   */
   @Override
   public void skipParsingPattern() {
     event.skipParsingPattern();
@@ -143,6 +139,26 @@ public class PipeRealtimeEvent extends EnrichedEvent {
     event.skipParsingTime();
   }
 
+  @Override
+  public boolean shouldParseTime() {
+    return event.shouldParseTime();
+  }
+
+  @Override
+  public boolean shouldParsePattern() {
+    return event.shouldParsePattern();
+  }
+
+  @Override
+  public boolean mayEventTimeOverlappedWithTimeRange() {
+    return event.mayEventTimeOverlappedWithTimeRange();
+  }
+
+  @Override
+  public boolean mayEventPathsOverlappedWithPattern() {
+    return event.mayEventPathsOverlappedWithPattern();
+  }
+
   @Override
   public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       final String pipeName,
@@ -167,11 +183,6 @@ public class PipeRealtimeEvent extends EnrichedEvent {
     return event.isGeneratedByPipe();
   }
 
-  @Override
-  public boolean mayEventTimeOverlappedWithTimeRange() {
-    return event.mayEventTimeOverlappedWithTimeRange();
-  }
-
   @Override
   public String toString() {
     return "PipeRealtimeEvent{"
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 237dace7c02..1f36e98bec0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -109,8 +109,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time
   private long historicalDataExtractionTimeLowerBound; // Arrival time
 
-  private boolean sloppyPattern;
   private boolean sloppyTimeRange; // true to disable time range filter after 
extraction
+  private boolean sloppyPattern; // true to disable pattern filter after 
extraction
 
   private Pair<Boolean, Boolean> listeningOptionPair;
   private boolean shouldExtractInsertion;
@@ -142,10 +142,9 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                         EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE)
                     .split(","))
             .map(String::trim)
+            .filter(s -> !s.isEmpty())
             .map(String::toLowerCase)
             .collect(Collectors.toSet());
-    // Avoid empty string
-    sloppyOptionSet.remove("");
     sloppyTimeRange = 
sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE);
     sloppyPattern = 
sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE);
     if (!sloppyOptionSet.isEmpty()) {
@@ -183,6 +182,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                   EXTRACTOR_END_TIME_KEY,
                   historicalDataExtractionEndTime));
         }
+      } catch (final PipeParameterNotValidException e) {
+        throw e;
       } catch (final Exception e) {
         // compatible with the current validation framework
         throw new PipeParameterNotValidException(e.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index c26db376da0..89e42ea0974 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -54,6 +54,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -61,7 +62,10 @@ import java.util.stream.Collectors;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_PATH_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_TIME_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
@@ -87,7 +91,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
   protected long realtimeDataExtractionStartTime = Long.MIN_VALUE; // Event 
time
   protected long realtimeDataExtractionEndTime = Long.MAX_VALUE; // Event time
 
-  private boolean disableSkippingTimeParse = false;
+  private boolean disableCheckingDataRegionTimePartitionCovering = false;
   private long startTimePartitionIdLowerBound; // calculated by 
realtimeDataExtractionStartTime
   private long endTimePartitionIdUpperBound; // calculated by 
realtimeDataExtractionEndTime
 
@@ -102,6 +106,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
   private boolean shouldTransferModFile; // Whether to transfer mods
 
   private boolean sloppyTimeRange; // true to disable time range filter after 
extraction
+  private boolean sloppyPattern; // true to disable pattern filter after 
extraction
 
   // This queue is used to store pending events extracted by the method 
extract(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
@@ -142,10 +147,32 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
                 EXTRACTOR_END_TIME_KEY,
                 realtimeDataExtractionEndTime));
       }
+    } catch (final PipeParameterNotValidException e) {
+      throw e;
     } catch (final Exception e) {
       // compatible with the current validation framework
       throw new PipeParameterNotValidException(e.getMessage());
     }
+
+    final Set<String> sloppyOptionSet =
+        Arrays.stream(
+                parameters
+                    .getStringOrDefault(
+                        Arrays.asList(
+                            EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, 
SOURCE_REALTIME_LOOSE_RANGE_KEY),
+                        EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE)
+                    .split(","))
+            .map(String::trim)
+            .filter(s -> !s.isEmpty())
+            .map(String::toLowerCase)
+            .collect(Collectors.toSet());
+    sloppyTimeRange = 
sloppyOptionSet.remove(EXTRACTOR_REALTIME_LOOSE_RANGE_TIME_VALUE);
+    sloppyPattern = 
sloppyOptionSet.remove(EXTRACTOR_REALTIME_LOOSE_RANGE_PATH_VALUE);
+    if (!sloppyOptionSet.isEmpty()) {
+      throw new PipeParameterNotValidException(
+          String.format(
+              "Parameters in set %s are not allowed in 
'realtime.loose-range'", sloppyOptionSet));
+    }
   }
 
   @Override
@@ -203,18 +230,13 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
             Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY),
             EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE || shouldExtractDeletion);
 
-    sloppyTimeRange =
-        Arrays.stream(
-                parameters
-                    .getStringOrDefault(
-                        Arrays.asList(
-                            EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, 
SOURCE_REALTIME_LOOSE_RANGE_KEY),
-                        "")
-                    .split(","))
-            .map(String::trim)
-            .map(String::toLowerCase)
-            .collect(Collectors.toSet())
-            .contains("time");
+    if (LOGGER.isInfoEnabled()) {
+      LOGGER.info(
+          "Pipe {}@{}: realtime data region extractor is initialized with 
parameters: {}.",
+          pipeName,
+          dataRegionId,
+          parameters);
+    }
   }
 
   @Override
@@ -262,28 +284,37 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
       event.skipParsingPattern();
     }
 
-    if (!disableSkippingTimeParse && 
Objects.nonNull(dataRegionTimePartitionIdBound.get())) {
+    if (!disableCheckingDataRegionTimePartitionCovering
+        && Objects.nonNull(dataRegionTimePartitionIdBound.get())) {
       if (isDataRegionTimePartitionCoveredByTimeRange()) {
         event.skipParsingTime();
       } else {
         // Since we only record the upper and lower bounds that time partition 
has ever reached, if
         // the time partition cannot be covered by the time range during 
query, it will not be
         // possible later.
-        disableSkippingTimeParse = true;
+        disableCheckingDataRegionTimePartitionCovering = true;
       }
     }
 
     // 1. Check if time parsing is necessary. If not, it means that the 
timestamps of the data
     // contained in this event are definitely within the time range [start 
time, end time].
-    // Otherwise,
-    // 2. Check if the timestamps of the data contained in this event 
intersect with the time range.
-    // If there is no intersection, it indicates that this data will be 
filtered out by the
-    // extractor, and the extract process is skipped.
-    if (!event.shouldParseTime() || 
event.getEvent().mayEventTimeOverlappedWithTimeRange()) {
+    // 2. Check if the event's data timestamps may intersect with the time 
range. If not, it means
+    // that the data timestamps of this event are definitely not within the 
time range.
+    // 3. Check if pattern parsing is necessary. If not, it means that the 
paths of the data
+    // contained in this event are definitely covered by the pattern.
+    // 4. Check if the event's data paths may intersect with the pattern. If 
not, it means that the
+    // data of this event is definitely not overlapped with the pattern.
+    if ((!event.shouldParseTime() || 
event.getEvent().mayEventTimeOverlappedWithTimeRange())
+        && (!event.shouldParsePattern() || 
event.getEvent().mayEventPathsOverlappedWithPattern())) {
       if (sloppyTimeRange) {
         // only skip parsing time for events whose data timestamps may 
intersect with the time range
         event.skipParsingTime();
       }
+      if (sloppyPattern) {
+        // only skip parsing pattern for events whose data paths may intersect 
with the pattern
+        event.skipParsingPattern();
+      }
+
       doExtract(event);
     } else {
       
event.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), 
false);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
index 1e7587153fb..8649183c1ec 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
@@ -22,9 +22,12 @@ package org.apache.iotdb.db.pipe.pattern;
 import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -47,6 +50,27 @@ import java.util.stream.IntStream;
 
 public class CachedSchemaPatternMatcherTest {
 
+  private static class MockedPipeRealtimeEvent extends PipeRealtimeEvent {
+
+    public MockedPipeRealtimeEvent(
+        EnrichedEvent event,
+        TsFileEpoch tsFileEpoch,
+        Map<String, String[]> device2Measurements,
+        PipePattern pattern) {
+      super(event, tsFileEpoch, device2Measurements, pattern);
+    }
+
+    @Override
+    public boolean shouldParseTime() {
+      return false;
+    }
+
+    @Override
+    public boolean shouldParsePattern() {
+      return false;
+    }
+  }
+
   private CachedSchemaPatternMatcher matcher;
   private ExecutorService executorService;
   private List<PipeRealtimeDataRegionExtractor> extractors;
@@ -125,14 +149,14 @@ public class CachedSchemaPatternMatcherTest {
     long totalTime = 0;
     for (int i = 0; i < epochNum; i++) {
       for (int j = 0; j < deviceNum; j++) {
-        PipeRealtimeEvent event =
-            new PipeRealtimeEvent(
+        MockedPipeRealtimeEvent event =
+            new MockedPipeRealtimeEvent(
                 null, null, Collections.singletonMap("root." + i, 
measurements), null);
         long startTime = System.currentTimeMillis();
         matcher.match(event).forEach(extractor -> extractor.extract(event));
         totalTime += (System.currentTimeMillis() - startTime);
       }
-      PipeRealtimeEvent event = new PipeRealtimeEvent(null, null, deviceMap, 
null);
+      MockedPipeRealtimeEvent event = new MockedPipeRealtimeEvent(null, null, 
deviceMap, null);
       long startTime = System.currentTimeMillis();
       matcher.match(event).forEach(extractor -> extractor.extract(event));
       totalTime += (System.currentTimeMillis() - startTime);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 99efcd48d61..1bf147e998b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -84,6 +84,9 @@ public class PipeExtractorConstant {
   public static final String EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE = 
"batch";
   public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_KEY = 
"extractor.realtime.loose-range";
   public static final String SOURCE_REALTIME_LOOSE_RANGE_KEY = 
"source.realtime.loose-range";
+  public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_TIME_VALUE = 
"time";
+  public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_PATH_VALUE = 
"path";
+  public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE = "";
 
   public static final String EXTRACTOR_START_TIME_KEY = "extractor.start-time";
   public static final String SOURCE_START_TIME_KEY = "source.start-time";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 8de596a5447..6d68b607294 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -322,6 +322,8 @@ public abstract class EnrichedEvent implements Event {
 
   public abstract boolean mayEventTimeOverlappedWithTimeRange();
 
+  public abstract boolean mayEventPathsOverlappedWithPattern();
+
   public void setCommitterKeyAndCommitId(final String committerKey, final long 
commitId) {
     this.committerKey = committerKey;
     this.commitId = commitId;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
index f130a83c0ee..1c6dd689e64 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
@@ -63,6 +63,11 @@ public abstract class PipeSnapshotEvent extends 
EnrichedEvent implements Seriali
     return true;
   }
 
+  @Override
+  public boolean mayEventPathsOverlappedWithPattern() {
+    return true;
+  }
+
   /////////////////////////////// Type parsing ///////////////////////////////
 
   public String toSealTypeString() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
index c553e6ce996..4a7a25dbd23 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
@@ -71,6 +71,11 @@ public abstract class PipeWritePlanEvent extends 
EnrichedEvent implements Serial
     return true;
   }
 
+  @Override
+  public boolean mayEventPathsOverlappedWithPattern() {
+    return true;
+  }
+
   /////////////////////////// Object ///////////////////////////
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
index 4f00c89c5e6..700f8f16387 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
@@ -83,4 +83,9 @@ public class ProgressReportEvent extends EnrichedEvent {
   public boolean mayEventTimeOverlappedWithTimeRange() {
     return true;
   }
+
+  @Override
+  public boolean mayEventPathsOverlappedWithPattern() {
+    return true;
+  }
 }


Reply via email to