This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-realtime-loose-range in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 161583cbea96888f65dd8487aea03b20a4ba91be Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Jun 17 18:35:13 2024 +0800 Pipe: Support "source.realtime.loose-range" = "path" in iotdb-source --- .../common/tsfile/PipeTsFileInsertionEvent.java | 4 ++ .../realtime/PipeRealtimeDataRegionExtractor.java | 57 ++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index a7c0cc5c255..8744eb6cb69 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -159,6 +159,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns return !resource.isEmpty(); } + public TsFileResource getResource() { + return resource; + } + public File getTsFile() { return tsFile; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java index 262f2d824aa..62b665441d8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java @@ -30,13 +30,18 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeTimePartitionListener; import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter; +import org.apache.iotdb.db.pipe.resource.PipeResourceManager; +import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.pipe.api.PipeExtractor; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; @@ -45,14 +50,18 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.PlainDeviceID; import org.apache.tsfile.utils.Pair; import org.checkerframework.checker.nullness.qual.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -307,6 +316,11 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { // only skip parsing time for events whose data timestamps may intersect with the time range event.skipParsingTime(); } + if (sloppyPattern && mayEventPathsOverlappedWithPattern(event)) { + // only skip parsing pattern for events whose data paths may intersect with the pattern + event.skipParsingPattern(); + } + doExtract(event); } else { event.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), false); @@ -319,6 +333,49 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { } } + private boolean mayEventPathsOverlappedWithPattern(final PipeRealtimeEvent realtimeEvent) { + final EnrichedEvent event = realtimeEvent.getEvent(); + + if (event instanceof PipeInsertNodeTabletInsertionEvent) { + final String deviceId = ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(); + return Objects.isNull(deviceId) || pipePattern.mayOverlapWithDevice(deviceId); + } + + if (event instanceof PipeTsFileInsertionEvent) { + final TsFileResource resource = ((PipeTsFileInsertionEvent) event).getResource(); + if (!resource.isClosed()) { + return true; + } + + try { + final Map<IDeviceID, Boolean> deviceIsAlignedMap = + PipeResourceManager.tsfile() + .getDeviceIsAlignedMapFromCache( + PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir( + resource.getTsFile())); + final Set<IDeviceID> deviceSet = + Objects.nonNull(deviceIsAlignedMap) + ? deviceIsAlignedMap.keySet() + : resource.getDevices(); + return deviceSet.stream() + .anyMatch( + // TODO: use IDeviceID + deviceID -> + pipePattern.mayOverlapWithDevice(((PlainDeviceID) deviceID).toStringID())); + } catch (final IOException e) { + LOGGER.warn( + "Pipe {}@{}: failed to get devices from TsFile {}, extract it anyway", + pipeName, + dataRegionId, + resource.getTsFilePath(), + e); + return true; + } + } + + return false; + } + protected abstract void doExtract(final PipeRealtimeEvent event); protected void extractHeartbeat(final PipeRealtimeEvent event) {
