This is an automated email from the ASF dual-hosted git repository. vgalaxies pushed a commit to branch path-exclusion in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0e7eb29b2b0e18173b9f1ec980511bf34860e0b1 Author: VGalaxies <[email protected]> AuthorDate: Mon Oct 13 11:28:45 2025 +0800 backup --- .../TabletInsertionEventTreePatternParser.java | 39 ++++++- .../query/TsFileInsertionEventQueryParser.java | 66 ++++++++++- .../scan/TsFileInsertionEventScanParser.java | 48 +++++++- .../pipe/event/PipeTabletInsertionEventTest.java | 44 +++++++ .../pipe/event/TsFileInsertionEventParserTest.java | 126 +++++++++++++++++++++ 5 files changed, 313 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java index 68fb0e50b95..242d6448402 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java @@ -45,6 +45,10 @@ public class TabletInsertionEventTreePatternParser extends TabletInsertionEventP private final TreePattern pattern; + // Exclusion patterns for tree model filtering + // This field is expected to be initialized by the constructor externally in upstream changes. + private List<TreePattern> exclusionPatterns; + public TabletInsertionEventTreePatternParser( final PipeTaskMeta pipeTaskMeta, final EnrichedEvent sourceEvent, @@ -92,11 +96,22 @@ public class TabletInsertionEventTreePatternParser extends TabletInsertionEventP final Integer[] originColumnIndex2FilteredColumnIndexMapperList) { final int originColumnSize = originMeasurementList.length; + // Helper to check exclusion + final java.util.function.Predicate<String> excluded = m -> isMeasurementExcluded(deviceId, m); + // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c // in this case, all data can be matched without checking the measurements if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceId)) { + int filteredCount = 0; for (int i = 0; i < originColumnSize; i++) { - originColumnIndex2FilteredColumnIndexMapperList[i] = i; + final String measurement = originMeasurementList[i]; + // ignore null measurement for partial insert + if (measurement == null) { + continue; + } + if (!excluded.test(measurement)) { + originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++; + } } } @@ -113,13 +128,33 @@ public class TabletInsertionEventTreePatternParser extends TabletInsertionEventP continue; } - if (pattern.matchesMeasurement(deviceId, measurement)) { + if (pattern.matchesMeasurement(deviceId, measurement) && !excluded.test(measurement)) { originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++; } } } } + private boolean isMeasurementExcluded( + final org.apache.tsfile.file.metadata.IDeviceID device, final String measurement) { + if (Objects.isNull(exclusionPatterns) || exclusionPatterns.isEmpty()) { + return false; + } + for (final TreePattern ex : exclusionPatterns) { + if (Objects.isNull(ex)) { + continue; + } + // If the exclusion covers the device, exclude all measurements + if (ex.coversDevice(device)) { + return true; + } + if (ex.mayOverlapWithDevice(device) && ex.matchesMeasurement(device, measurement)) { + return true; + } + } + return false; + } + //////////////////////////// process //////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java index d61f7a791ca..a45c06b5abc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java @@ -67,6 +67,9 @@ public class TsFileInsertionEventQueryParser extends TsFileInsertionEventParser private final Map<IDeviceID, Boolean> deviceIsAlignedMap; private final Map<String, TSDataType> measurementDataTypeMap; + // Tree exclusion patterns. Expected to be set by upstream constructors. + private List<TreePattern> exclusionPatterns; + @TestOnly public TsFileInsertionEventQueryParser( final File tsFile, @@ -176,13 +179,24 @@ public class TsFileInsertionEventQueryParser extends TsFileInsertionEventParser for (Map.Entry<IDeviceID, List<String>> entry : originalDeviceMeasurementsMap.entrySet()) { final IDeviceID deviceId = entry.getKey(); + // Skip device entirely if excluded by any exclusion pattern that covers the device + if (isDeviceExcluded(deviceId)) { + continue; + } + // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c // in this case, all data can be matched without checking the measurements if (Objects.isNull(treePattern) || treePattern.isRoot() || treePattern.coversDevice(deviceId)) { - if (!entry.getValue().isEmpty()) { - filteredDeviceMeasurementsMap.put(deviceId, entry.getValue()); + final List<String> filteredMeasurements = new ArrayList<>(); + for (final String measurement : entry.getValue()) { + if (!isMeasurementExcluded(deviceId, measurement)) { + filteredMeasurements.add(measurement); + } + } + if (!filteredMeasurements.isEmpty()) { + filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements); } } @@ -192,7 +206,8 @@ public class TsFileInsertionEventQueryParser extends TsFileInsertionEventParser final List<String> filteredMeasurements = new ArrayList<>(); for (final String measurement : entry.getValue()) { - if (treePattern.matchesMeasurement(deviceId, measurement)) { + if (treePattern.matchesMeasurement(deviceId, measurement) + && !isMeasurementExcluded(deviceId, measurement)) { filteredMeasurements.add(measurement); } } @@ -219,12 +234,20 @@ public class TsFileInsertionEventQueryParser extends TsFileInsertionEventParser private Set<IDeviceID> filterDevicesByPattern(final Set<IDeviceID> devices) { if (Objects.isNull(treePattern) || treePattern.isRoot()) { - return devices; + // If only exclusion exists, filter devices that are fully excluded + final Set<IDeviceID> filtered = new HashSet<>(); + for (final IDeviceID device : devices) { + if (!isDeviceExcluded(device)) { + filtered.add(device); + } + } + return filtered; } final Set<IDeviceID> filteredDevices = new HashSet<>(); for (final IDeviceID device : devices) { - if (treePattern.coversDevice(device) || treePattern.mayOverlapWithDevice(device)) { + if ((treePattern.coversDevice(device) || treePattern.mayOverlapWithDevice(device)) + && !isDeviceExcluded(device)) { filteredDevices.add(device); } } @@ -391,6 +414,39 @@ public class TsFileInsertionEventQueryParser extends TsFileInsertionEventParser return tabletInsertionIterable; } + private boolean isDeviceExcluded(final IDeviceID device) { + if (Objects.isNull(exclusionPatterns) || exclusionPatterns.isEmpty()) { + return false; + } + for (final TreePattern ex : exclusionPatterns) { + if (Objects.isNull(ex)) { + continue; + } + if (ex.coversDevice(device)) { + return true; + } + } + return false; + } + + private boolean isMeasurementExcluded(final IDeviceID device, final String measurement) { + if (Objects.isNull(exclusionPatterns) || exclusionPatterns.isEmpty()) { + return false; + } + for (final TreePattern ex : exclusionPatterns) { + if (Objects.isNull(ex)) { + continue; + } + if (ex.coversDevice(device)) { + return true; + } + if (ex.mayOverlapWithDevice(device) && ex.matchesMeasurement(device, measurement)) { + return true; + } + } + return false; + } + @Override public void close() { try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 44941e34c3a..0907ee4193f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -73,6 +73,9 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { private final PipeMemoryBlock allocatedMemoryBlockForBatchData; private final PipeMemoryBlock allocatedMemoryBlockForChunk; + // Tree exclusion patterns. Expected to be set by upstream constructors. + private List<TreePattern> exclusionPatterns; + private boolean currentIsMultiPage; private IDeviceID currentDevice; private boolean currentIsAligned; @@ -424,7 +427,9 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { break; } - if (!treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { + // Inclusion check first, then exclusion + if (!treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID()) + || isMeasurementExcluded(currentDevice, chunkHeader.getMeasurementID())) { tsFileSequenceReader.position( tsFileSequenceReader.position() + chunkHeader.getDataSize()); break; @@ -456,7 +461,8 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { chunkHeader = tsFileSequenceReader.readChunkHeader(marker); if (Objects.isNull(currentDevice) - || !treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { + || !treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID()) + || isMeasurementExcluded(currentDevice, chunkHeader.getMeasurementID())) { tsFileSequenceReader.position( tsFileSequenceReader.position() + chunkHeader.getDataSize()); break; @@ -523,7 +529,10 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { isMultiPageList.clear(); measurementIndexMap.clear(); final IDeviceID deviceID = tsFileSequenceReader.readChunkGroupHeader().getDeviceID(); - currentDevice = treePattern.mayOverlapWithDevice(deviceID) ? deviceID : null; + currentDevice = + (treePattern.mayOverlapWithDevice(deviceID) && !isDeviceExcluded(deviceID)) + ? deviceID + : null; break; case MetaMarker.OPERATION_INDEX_RANGE: tsFileSequenceReader.readPlanIndex(); @@ -556,6 +565,39 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { return false; } + private boolean isDeviceExcluded(final IDeviceID device) { + if (Objects.isNull(exclusionPatterns) || exclusionPatterns.isEmpty()) { + return false; + } + for (final TreePattern ex : exclusionPatterns) { + if (Objects.isNull(ex)) { + continue; + } + if (ex.coversDevice(device)) { + return true; + } + } + return false; + } + + private boolean isMeasurementExcluded(final IDeviceID device, final String measurement) { + if (Objects.isNull(exclusionPatterns) || exclusionPatterns.isEmpty()) { + return false; + } + for (final TreePattern ex : exclusionPatterns) { + if (Objects.isNull(ex)) { + continue; + } + if (ex.coversDevice(device)) { + return true; + } + if (ex.mayOverlapWithDevice(device) && ex.matchesMeasurement(device, measurement)) { + return true; + } + } + return false; + } + @Override public void close() { super.close(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java index 8516a9900e6..7f0ed35bed9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java @@ -21,7 +21,9 @@ package org.apache.iotdb.db.pipe.event; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -33,13 +35,17 @@ import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.lang.reflect.Field; import java.time.LocalDate; import java.util.Arrays; +import java.util.Collections; +import java.util.List; public class PipeTabletInsertionEventTest { @@ -394,4 +400,42 @@ public class PipeTabletInsertionEventTest { event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 115L, Long.MAX_VALUE); Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange()); } + + private void setExclusions(Object parser, List<TreePattern> exclusions) throws Exception { + final Field f = + parser + .getClass() + .getDeclaredField("exclusionPatterns"); // present per data structure assumption + f.setAccessible(true); + f.set(parser, exclusions); + } + + @Test + public void testExclusionFilteringInTabletParser() throws Exception { + // Include all measurements under device + final PrefixTreePattern includePattern = new PrefixTreePattern("root.sg.d1"); + + // Exclude a single measurement s1 + final TreePattern excludeS1 = new PrefixTreePattern("root.sg.d1.s1"); + + final TabletInsertionEventTreePatternParser parser = + new TabletInsertionEventTreePatternParser(insertTabletNode, includePattern); + setExclusions(parser, Collections.singletonList(excludeS1)); + + final Tablet filtered = parser.convertToTablet(); + // Ensure s1 is filtered out + for (final IMeasurementSchema s : filtered.getSchemas()) { + Assert.assertNotEquals("s1", s.getMeasurementName()); + } + Assert.assertEquals("All except s1", schemas.length - 1, filtered.getSchemas().size()); + + // Exclude entire device + final TreePattern excludeDevice = new IoTDBTreePattern("root.sg.d1.**"); + final TabletInsertionEventTreePatternParser parser2 = + new TabletInsertionEventTreePatternParser(insertTabletNode, includePattern); + setExclusions(parser2, Collections.singletonList(excludeDevice)); + + final Tablet filtered2 = parser2.convertToTablet(); + Assert.assertEquals(0, filtered2.getSchemas().size()); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 9c58ae9f0f6..90c294677ed 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -648,4 +648,130 @@ public class TsFileInsertionEventParserTest { } return count; } + + private void setExclusions(Object parser, List<TreePattern> exclusions) throws Exception { + final java.lang.reflect.Field f = + parser.getClass().getDeclaredField("exclusionPatterns"); // per assumption + f.setAccessible(true); + f.set(parser, exclusions); + } + + private int countPoints(final TsFileInsertionEventParser parser) throws IOException { + final AtomicInteger count = new AtomicInteger(0); + parser + .toTabletInsertionEvents() + .forEach( + event -> + event.processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + count.addAndGet(getNonNullSize(row)); + } catch (final IOException e) { + throw new RuntimeException(e); + } + })); + return count.get(); + } + + private File generateSimpleTsFile(final String fileName) throws Exception { + final File tsFile = new File(fileName); + if (tsFile.exists()) { + tsFile.delete(); + } + + final List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); + + final Tablet t = new Tablet("root.sg.ex", schemaList, 3); + t.addTimestamp(0, 1); + t.addTimestamp(1, 2); + t.addTimestamp(2, 3); + t.addValue("s1", 0, 10L); + t.addValue("s2", 0, 20L); + t.addValue("s1", 1, 11L); + t.addValue("s2", 1, 21L); + t.addValue("s1", 2, 12L); + t.addValue("s2", 2, 22L); + + try (final TsFileWriter writer = new TsFileWriter(tsFile)) { + writer.registerTimeseries(new PartialPath("root.sg.ex.s1"), schemaList.get(0)); + writer.registerTimeseries(new PartialPath("root.sg.ex.s2"), schemaList.get(1)); + writer.writeAligned(t); + writer.flush(); + } + + return tsFile; + } + + @Test + public void testExclusionPatternsOnTsFileParsers() throws Exception { + final File tsFile = generateSimpleTsFile("simple-ex.tsfile"); + + final PipeTsFileInsertionEvent tsFileInsertionEvent = + new PipeTsFileInsertionEvent( + false, + "", + new TsFileResource(tsFile), + null, + true, + false, + false, + null, + null, + 0, + null, + null, + null, + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + + final TreePattern includePattern = new IoTDBTreePattern("root.sg.ex.**"); + + // Query parser tests + try (final TsFileInsertionEventQueryParser queryParser = + new TsFileInsertionEventQueryParser( + tsFile, includePattern, Long.MIN_VALUE, Long.MAX_VALUE, tsFileInsertionEvent)) { + Assert.assertEquals(6, countPoints(queryParser)); + + // Exclude one measurement + setExclusions(queryParser, Arrays.asList(new IoTDBTreePattern("root.sg.ex.s1"))); + Assert.assertEquals(3, countPoints(queryParser)); + } + + try (final TsFileInsertionEventQueryParser queryParser2 = + new TsFileInsertionEventQueryParser( + tsFile, includePattern, Long.MIN_VALUE, Long.MAX_VALUE, tsFileInsertionEvent)) { + // Exclude entire device + setExclusions(queryParser2, Arrays.asList(new IoTDBTreePattern("root.sg.ex.**"))); + Assert.assertEquals(0, countPoints(queryParser2)); + } + + // Scan parser tests + try (final TsFileInsertionEventScanParser scanParser = + new TsFileInsertionEventScanParser( + tsFile, includePattern, Long.MIN_VALUE, Long.MAX_VALUE, null, tsFileInsertionEvent)) { + Assert.assertEquals(6, countPoints(scanParser)); + + // Exclude one measurement + setExclusions(scanParser, Arrays.asList(new IoTDBTreePattern("root.sg.ex.s1"))); + Assert.assertEquals(3, countPoints(scanParser)); + } + + try (final TsFileInsertionEventScanParser scanParser2 = + new TsFileInsertionEventScanParser( + tsFile, includePattern, Long.MIN_VALUE, Long.MAX_VALUE, null, tsFileInsertionEvent)) { + setExclusions(scanParser2, Arrays.asList(new IoTDBTreePattern("root.sg.ex.**"))); + Assert.assertEquals(0, countPoints(scanParser2)); + } + + if (tsFile.exists()) { + tsFile.delete(); + } + } }
