This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1cd5ecde5f2 Pipe: Support "source.realtime.loose-range" = "path" in
iotdb-source (#12751)
1cd5ecde5f2 is described below
commit 1cd5ecde5f2fd060d9282b6d93331956a55631c0
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Jun 18 16:54:50 2024 +0800
Pipe: Support "source.realtime.loose-range" = "path" in iotdb-source
(#12751)
---
.../pipe/it/autocreate/IoTDBPipeExtractorIT.java | 60 ++++++++++++++-
.../db/pipe/event/UserDefinedEnrichedEvent.java | 5 ++
.../event/common/heartbeat/PipeHeartbeatEvent.java | 5 ++
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 86 ++++++++++++++--------
.../common/tablet/PipeRawTabletInsertionEvent.java | 6 ++
.../event/common/terminate/PipeTerminateEvent.java | 5 ++
.../common/tsfile/PipeTsFileInsertionEvent.java | 47 +++++++-----
.../db/pipe/event/realtime/PipeRealtimeEvent.java | 29 +++++---
.../PipeHistoricalDataRegionTsFileExtractor.java | 7 +-
.../realtime/PipeRealtimeDataRegionExtractor.java | 71 +++++++++++++-----
.../pattern/CachedSchemaPatternMatcherTest.java | 30 +++++++-
.../config/constant/PipeExtractorConstant.java | 3 +
.../iotdb/commons/pipe/event/EnrichedEvent.java | 2 +
.../commons/pipe/event/PipeSnapshotEvent.java | 5 ++
.../commons/pipe/event/PipeWritePlanEvent.java | 5 ++
.../commons/pipe/event/ProgressReportEvent.java | 5 ++
16 files changed, 288 insertions(+), 83 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index f83699f30b7..8ce885935b8 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -904,7 +904,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
}
@Test
- public void testLooseRange() throws Exception {
+ public void testHistoryLooseRange() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
@@ -964,6 +964,64 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
}
}
+ @Test
+ public void testRealtimeLooseRange() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("source.path", "root.db.d1.at1");
+ extractorAttributes.put("source.inclusion", "data.insert");
+ extractorAttributes.put("source.realtime.loose-range", "time, path");
+ extractorAttributes.put("source.start-time", "2000");
+ extractorAttributes.put("source.end-time", "10000");
+ extractorAttributes.put("source.realtime.mode", "batch");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1 (time, at1, at2)" + " values (1000, 1,
2), (3000, 3, 4)",
+ "flush"))) {
+ return;
+ }
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db1.d1 (time, at1, at2)" + " values (1000, 1,
2), (3000, 3, 4)",
+ "flush"))) {
+ return;
+ }
+
+ TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1 (time, at1)" + " values (5000, 1),
(16000, 3)",
+ "insert into root.db.d1 (time, at1, at2)" + " values (5001, 1,
2), (6001, 3, 4)",
+ "flush"));
+ }
+ }
+
private void assertTimeseriesCountOnReceiver(BaseEnv receiverEnv, int count)
{
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "count timeseries", "count(timeseries),",
Collections.singleton(count + ","));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
index 4803158db2e..82829a4542b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
@@ -92,4 +92,9 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
public boolean mayEventTimeOverlappedWithTimeRange() {
return enrichedEvent.mayEventTimeOverlappedWithTimeRange();
}
+
+ @Override
+ public boolean mayEventPathsOverlappedWithPattern() {
+ return enrichedEvent.mayEventPathsOverlappedWithPattern();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index f82d799b7d3..d3830b6dcf0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -121,6 +121,11 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
return true;
}
+ @Override
+ public boolean mayEventPathsOverlappedWithPattern() {
+ return true;
+ }
+
/////////////////////////////// Whether to print
///////////////////////////////
public boolean isShouldPrintMessage() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index b9ddfd0d55d..abb0fcb572c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -205,29 +204,72 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public boolean mayEventTimeOverlappedWithTimeRange() {
try {
- final InsertNode insertNode = getInsertNode();
+ final InsertNode insertNode = getInsertNodeViaCacheIfPossible();
+ if (Objects.isNull(insertNode)) {
+ return true;
+ }
+
if (insertNode instanceof InsertRowNode) {
final long timestamp = ((InsertRowNode) insertNode).getTime();
return startTime <= timestamp && timestamp <= endTime;
- } else if (insertNode instanceof InsertTabletNode) {
+ }
+
+ if (insertNode instanceof InsertTabletNode) {
final long[] timestamps = ((InsertTabletNode) insertNode).getTimes();
if (Objects.isNull(timestamps) || timestamps.length == 0) {
return false;
}
// We assume that `timestamps` is ordered.
return startTime <= timestamps[timestamps.length - 1] && timestamps[0]
<= endTime;
- } else if (insertNode instanceof InsertRowsNode) {
- for (final InsertRowNode node : ((InsertRowsNode)
insertNode).getInsertRowNodeList()) {
- final long timestamp = node.getTime();
- if (startTime <= timestamp && timestamp <= endTime) {
- return true;
- }
- }
- return false;
- } else {
- throw new UnSupportedDataTypeException(
- String.format("InsertNode type %s is not supported.",
insertNode.getClass().getName()));
}
+
+ if (insertNode instanceof InsertRowsNode) {
+ return ((InsertRowsNode) insertNode)
+ .getInsertRowNodeList().stream()
+ .anyMatch(
+ insertRowNode -> {
+ final long timestamp = insertRowNode.getTime();
+ return startTime <= timestamp && timestamp <= endTime;
+ });
+ }
+
+ return true;
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Exception occurred when determining the event time of
PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: [{}, {}].
Returning true to ensure data integrity.",
+ this,
+ startTime,
+ endTime,
+ e);
+ return true;
+ }
+ }
+
+ @Override
+ public boolean mayEventPathsOverlappedWithPattern() {
+ try {
+ final InsertNode insertNode = getInsertNodeViaCacheIfPossible();
+ if (Objects.isNull(insertNode)) {
+ return true;
+ }
+
+ if (insertNode instanceof InsertRowNode || insertNode instanceof
InsertTabletNode) {
+ final PartialPath devicePartialPath = insertNode.getDevicePath();
+ return Objects.isNull(devicePartialPath)
+ ||
pipePattern.mayOverlapWithDevice(devicePartialPath.getFullPath());
+ }
+
+ if (insertNode instanceof InsertRowsNode) {
+ return ((InsertRowsNode) insertNode)
+ .getInsertRowNodeList().stream()
+ .anyMatch(
+ insertRowNode ->
+ Objects.isNull(insertRowNode.getDevicePath())
+ || pipePattern.mayOverlapWithDevice(
+ insertRowNode.getDevicePath().getFullPath()));
+ }
+
+ return true;
} catch (final Exception e) {
LOGGER.warn(
"Exception occurred when determining the event time of
PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: [{}, {}].
Returning true to ensure data integrity.",
@@ -318,22 +360,6 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
/////////////////////////// parsePatternOrTime ///////////////////////////
- @Override
- public boolean shouldParsePattern() {
- final InsertNode node = getInsertNodeViaCacheIfPossible();
- return super.shouldParsePattern()
- && Objects.nonNull(pipePattern)
- && (Objects.isNull(node)
- || (node.getType() == PlanNodeType.INSERT_ROWS
- ? ((InsertRowsNode) node)
- .getInsertRowNodeList().stream()
- .anyMatch(
- insertRowNode ->
- !pipePattern.coversDevice(
-
insertRowNode.getDevicePath().getFullPath()))
- :
!pipePattern.coversDevice(node.getDevicePath().getFullPath())));
- }
-
public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
final List<PipeRawTabletInsertionEvent> events =
convertToTablets().stream()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 239f73b3c32..9769299ae68 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -191,6 +191,12 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
return startTime <= timestamps[timestamps.length - 1] && timestamps[0] <=
endTime;
}
+ @Override
+ public boolean mayEventPathsOverlappedWithPattern() {
+ final String deviceId = getDeviceId();
+ return Objects.isNull(deviceId) ||
pipePattern.mayOverlapWithDevice(deviceId);
+ }
+
public void markAsNeedToReport() {
this.needToReport = true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index cc423f1bf3d..146873ed3d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -84,6 +84,11 @@ public class PipeTerminateEvent extends EnrichedEvent {
return true;
}
+ @Override
+ public boolean mayEventPathsOverlappedWithPattern() {
+ return true;
+ }
+
@Override
public void reportProgress() {
PipeAgent.task().markCompleted(pipeName, dataRegionId);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index a7c0cc5c255..b7a883712df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -33,12 +34,17 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
public class PipeTsFileInsertionEvent extends EnrichedEvent implements
TsFileInsertionEvent {
@@ -277,28 +283,35 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
: resource.getFileStartTime() <= endTime;
}
- /////////////////////////// TsFileInsertionEvent ///////////////////////////
-
@Override
- public boolean shouldParseTimeOrPattern() {
- boolean shouldParseTimeOrPattern = false;
+ public boolean mayEventPathsOverlappedWithPattern() {
+ if (!resource.isClosed()) {
+ return true;
+ }
+
try {
- shouldParseTimeOrPattern = super.shouldParseTimeOrPattern();
- return shouldParseTimeOrPattern;
- } finally {
- // Super method will call shouldParsePattern() and then init
dataContainer at
- // shouldParsePattern(). If shouldParsePattern() returns false,
dataContainer will
- // not be used, so we need to close the resource here.
- if (!shouldParseTimeOrPattern) {
- close();
- }
+ final Map<IDeviceID, Boolean> deviceIsAlignedMap =
+ PipeResourceManager.tsfile()
+ .getDeviceIsAlignedMapFromCache(
+
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+ final Set<IDeviceID> deviceSet =
+ Objects.nonNull(deviceIsAlignedMap) ? deviceIsAlignedMap.keySet() :
resource.getDevices();
+ return deviceSet.stream()
+ .anyMatch(
+ // TODO: use IDeviceID
+ deviceID ->
+ pipePattern.mayOverlapWithDevice(((PlainDeviceID)
deviceID).toStringID()));
+ } catch (final IOException e) {
+ LOGGER.warn(
+ "Pipe {}: failed to get devices from TsFile {}, extract it anyway",
+ pipeName,
+ resource.getTsFilePath(),
+ e);
+ return true;
}
}
- @Override
- public boolean shouldParsePattern() {
- return super.shouldParsePattern() &&
initDataContainer().shouldParsePattern();
- }
+ /////////////////////////// TsFileInsertionEvent ///////////////////////////
@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index ce8e7d9f0fa..7e12a909b1f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -129,10 +129,6 @@ public class PipeRealtimeEvent extends EnrichedEvent {
return event.getProgressIndex();
}
- /**
- * If pipe's pattern is database-level, then no need to parse event by
pattern cause pipes are
- * data-region-level.
- */
@Override
public void skipParsingPattern() {
event.skipParsingPattern();
@@ -143,6 +139,26 @@ public class PipeRealtimeEvent extends EnrichedEvent {
event.skipParsingTime();
}
+ @Override
+ public boolean shouldParseTime() {
+ return event.shouldParseTime();
+ }
+
+ @Override
+ public boolean shouldParsePattern() {
+ return event.shouldParsePattern();
+ }
+
+ @Override
+ public boolean mayEventTimeOverlappedWithTimeRange() {
+ return event.mayEventTimeOverlappedWithTimeRange();
+ }
+
+ @Override
+ public boolean mayEventPathsOverlappedWithPattern() {
+ return event.mayEventPathsOverlappedWithPattern();
+ }
+
@Override
public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
@@ -167,11 +183,6 @@ public class PipeRealtimeEvent extends EnrichedEvent {
return event.isGeneratedByPipe();
}
- @Override
- public boolean mayEventTimeOverlappedWithTimeRange() {
- return event.mayEventTimeOverlappedWithTimeRange();
- }
-
@Override
public String toString() {
return "PipeRealtimeEvent{"
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 237dace7c02..1f36e98bec0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -109,8 +109,8 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time
private long historicalDataExtractionTimeLowerBound; // Arrival time
- private boolean sloppyPattern;
private boolean sloppyTimeRange; // true to disable time range filter after
extraction
+ private boolean sloppyPattern; // true to disable pattern filter after
extraction
private Pair<Boolean, Boolean> listeningOptionPair;
private boolean shouldExtractInsertion;
@@ -142,10 +142,9 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE)
.split(","))
.map(String::trim)
+ .filter(s -> !s.isEmpty())
.map(String::toLowerCase)
.collect(Collectors.toSet());
- // Avoid empty string
- sloppyOptionSet.remove("");
sloppyTimeRange =
sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE);
sloppyPattern =
sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE);
if (!sloppyOptionSet.isEmpty()) {
@@ -183,6 +182,8 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
EXTRACTOR_END_TIME_KEY,
historicalDataExtractionEndTime));
}
+ } catch (final PipeParameterNotValidException e) {
+ throw e;
} catch (final Exception e) {
// compatible with the current validation framework
throw new PipeParameterNotValidException(e.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index c26db376da0..89e42ea0974 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -54,6 +54,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -61,7 +62,10 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_PATH_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_TIME_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
@@ -87,7 +91,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
protected long realtimeDataExtractionStartTime = Long.MIN_VALUE; // Event
time
protected long realtimeDataExtractionEndTime = Long.MAX_VALUE; // Event time
- private boolean disableSkippingTimeParse = false;
+ private boolean disableCheckingDataRegionTimePartitionCovering = false;
private long startTimePartitionIdLowerBound; // calculated by
realtimeDataExtractionStartTime
private long endTimePartitionIdUpperBound; // calculated by
realtimeDataExtractionEndTime
@@ -102,6 +106,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
private boolean shouldTransferModFile; // Whether to transfer mods
private boolean sloppyTimeRange; // true to disable time range filter after
extraction
+ private boolean sloppyPattern; // true to disable pattern filter after
extraction
// This queue is used to store pending events extracted by the method
extract(). The method
// supply() will poll events from this queue and send them to the next pipe
plugin.
@@ -142,10 +147,32 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
EXTRACTOR_END_TIME_KEY,
realtimeDataExtractionEndTime));
}
+ } catch (final PipeParameterNotValidException e) {
+ throw e;
} catch (final Exception e) {
// compatible with the current validation framework
throw new PipeParameterNotValidException(e.getMessage());
}
+
+ final Set<String> sloppyOptionSet =
+ Arrays.stream(
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(
+ EXTRACTOR_REALTIME_LOOSE_RANGE_KEY,
SOURCE_REALTIME_LOOSE_RANGE_KEY),
+ EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE)
+ .split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .map(String::toLowerCase)
+ .collect(Collectors.toSet());
+ sloppyTimeRange =
sloppyOptionSet.remove(EXTRACTOR_REALTIME_LOOSE_RANGE_TIME_VALUE);
+ sloppyPattern =
sloppyOptionSet.remove(EXTRACTOR_REALTIME_LOOSE_RANGE_PATH_VALUE);
+ if (!sloppyOptionSet.isEmpty()) {
+ throw new PipeParameterNotValidException(
+ String.format(
+ "Parameters in set %s are not allowed in
'realtime.loose-range'", sloppyOptionSet));
+ }
}
@Override
@@ -203,18 +230,13 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY),
EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE || shouldExtractDeletion);
- sloppyTimeRange =
- Arrays.stream(
- parameters
- .getStringOrDefault(
- Arrays.asList(
- EXTRACTOR_REALTIME_LOOSE_RANGE_KEY,
SOURCE_REALTIME_LOOSE_RANGE_KEY),
- "")
- .split(","))
- .map(String::trim)
- .map(String::toLowerCase)
- .collect(Collectors.toSet())
- .contains("time");
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Pipe {}@{}: realtime data region extractor is initialized with
parameters: {}.",
+ pipeName,
+ dataRegionId,
+ parameters);
+ }
}
@Override
@@ -262,28 +284,37 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
event.skipParsingPattern();
}
- if (!disableSkippingTimeParse &&
Objects.nonNull(dataRegionTimePartitionIdBound.get())) {
+ if (!disableCheckingDataRegionTimePartitionCovering
+ && Objects.nonNull(dataRegionTimePartitionIdBound.get())) {
if (isDataRegionTimePartitionCoveredByTimeRange()) {
event.skipParsingTime();
} else {
// Since we only record the upper and lower bounds that time partition
has ever reached, if
// the time partition cannot be covered by the time range during
query, it will not be
// possible later.
- disableSkippingTimeParse = true;
+ disableCheckingDataRegionTimePartitionCovering = true;
}
}
// 1. Check if time parsing is necessary. If not, it means that the
timestamps of the data
// contained in this event are definitely within the time range [start
time, end time].
- // Otherwise,
- // 2. Check if the timestamps of the data contained in this event
intersect with the time range.
- // If there is no intersection, it indicates that this data will be
filtered out by the
- // extractor, and the extract process is skipped.
- if (!event.shouldParseTime() ||
event.getEvent().mayEventTimeOverlappedWithTimeRange()) {
+ // 2. Check if the event's data timestamps may intersect with the time
range. If not, it means
+ // that the data timestamps of this event are definitely not within the
time range.
+ // 3. Check if pattern parsing is necessary. If not, it means that the
paths of the data
+ // contained in this event are definitely covered by the pattern.
+ // 4. Check if the event's data paths may intersect with the pattern. If
not, it means that the
+ // data of this event is definitely not overlapped with the pattern.
+ if ((!event.shouldParseTime() ||
event.getEvent().mayEventTimeOverlappedWithTimeRange())
+ && (!event.shouldParsePattern() ||
event.getEvent().mayEventPathsOverlappedWithPattern())) {
if (sloppyTimeRange) {
// only skip parsing time for events whose data timestamps may
intersect with the time range
event.skipParsingTime();
}
+ if (sloppyPattern) {
+ // only skip parsing pattern for events whose data paths may intersect
with the pattern
+ event.skipParsingPattern();
+ }
+
doExtract(event);
} else {
event.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(),
false);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
index 1e7587153fb..8649183c1ec 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
@@ -22,9 +22,12 @@ package org.apache.iotdb.db.pipe.pattern;
import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
+import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
@@ -47,6 +50,27 @@ import java.util.stream.IntStream;
public class CachedSchemaPatternMatcherTest {
+ private static class MockedPipeRealtimeEvent extends PipeRealtimeEvent {
+
+ public MockedPipeRealtimeEvent(
+ EnrichedEvent event,
+ TsFileEpoch tsFileEpoch,
+ Map<String, String[]> device2Measurements,
+ PipePattern pattern) {
+ super(event, tsFileEpoch, device2Measurements, pattern);
+ }
+
+ @Override
+ public boolean shouldParseTime() {
+ return false;
+ }
+
+ @Override
+ public boolean shouldParsePattern() {
+ return false;
+ }
+ }
+
private CachedSchemaPatternMatcher matcher;
private ExecutorService executorService;
private List<PipeRealtimeDataRegionExtractor> extractors;
@@ -125,14 +149,14 @@ public class CachedSchemaPatternMatcherTest {
long totalTime = 0;
for (int i = 0; i < epochNum; i++) {
for (int j = 0; j < deviceNum; j++) {
- PipeRealtimeEvent event =
- new PipeRealtimeEvent(
+ MockedPipeRealtimeEvent event =
+ new MockedPipeRealtimeEvent(
null, null, Collections.singletonMap("root." + i,
measurements), null);
long startTime = System.currentTimeMillis();
matcher.match(event).forEach(extractor -> extractor.extract(event));
totalTime += (System.currentTimeMillis() - startTime);
}
- PipeRealtimeEvent event = new PipeRealtimeEvent(null, null, deviceMap,
null);
+ MockedPipeRealtimeEvent event = new MockedPipeRealtimeEvent(null, null,
deviceMap, null);
long startTime = System.currentTimeMillis();
matcher.match(event).forEach(extractor -> extractor.extract(event));
totalTime += (System.currentTimeMillis() - startTime);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 99efcd48d61..1bf147e998b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -84,6 +84,9 @@ public class PipeExtractorConstant {
public static final String EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE =
"batch";
public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_KEY =
"extractor.realtime.loose-range";
public static final String SOURCE_REALTIME_LOOSE_RANGE_KEY =
"source.realtime.loose-range";
+ public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_TIME_VALUE =
"time";
+ public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_PATH_VALUE =
"path";
+ public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE = "";
public static final String EXTRACTOR_START_TIME_KEY = "extractor.start-time";
public static final String SOURCE_START_TIME_KEY = "source.start-time";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 8de596a5447..6d68b607294 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -322,6 +322,8 @@ public abstract class EnrichedEvent implements Event {
public abstract boolean mayEventTimeOverlappedWithTimeRange();
+ public abstract boolean mayEventPathsOverlappedWithPattern();
+
public void setCommitterKeyAndCommitId(final String committerKey, final long
commitId) {
this.committerKey = committerKey;
this.commitId = commitId;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
index f130a83c0ee..1c6dd689e64 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
@@ -63,6 +63,11 @@ public abstract class PipeSnapshotEvent extends
EnrichedEvent implements Seriali
return true;
}
+ @Override
+ public boolean mayEventPathsOverlappedWithPattern() {
+ return true;
+ }
+
/////////////////////////////// Type parsing ///////////////////////////////
public String toSealTypeString() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
index c553e6ce996..4a7a25dbd23 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
@@ -71,6 +71,11 @@ public abstract class PipeWritePlanEvent extends
EnrichedEvent implements Serial
return true;
}
+ @Override
+ public boolean mayEventPathsOverlappedWithPattern() {
+ return true;
+ }
+
/////////////////////////// Object ///////////////////////////
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
index 4f00c89c5e6..700f8f16387 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
@@ -83,4 +83,9 @@ public class ProgressReportEvent extends EnrichedEvent {
public boolean mayEventTimeOverlappedWithTimeRange() {
return true;
}
+
+ @Override
+ public boolean mayEventPathsOverlappedWithPattern() {
+ return true;
+ }
}