This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f196cf20900 Pipe: Filter devices by pattern before reading device 
metadata from TsFile (#12765)
f196cf20900 is described below

commit f196cf2090033ba6148c36db5a24411f933b3138
Author: Zikun Ma <[email protected]>
AuthorDate: Wed Jun 19 23:51:12 2024 +0800

    Pipe: Filter devices by pattern before reading device metadata from TsFile 
(#12765)
    
    Currently we read the metadata of all devices and measurements when 
constructing a TsFileInsertionDataContainer. This is both time-consuming and 
memory-wasting if only a few devices match the pattern. This PR filters the 
devices by pattern before reading the metadata of devices and measurements, 
saving memory and I/O cost when there are many unmatched devices.
    
    Note: this only works when TsFile metadata are not cached (due to high 
memory usage of TsFile metadata cache), because cached metadata can not be 
filtered so that pipes with arbitrary patterns can use it.
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../tsfile/TsFileInsertionDataContainer.java       | 74 +++++++++++++++++++++-
 1 file changed, 72 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 03e61cf2809..8030cce9324 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -50,11 +51,13 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.Set;
 
 public class TsFileInsertionDataContainer implements AutoCloseable {
 
@@ -77,6 +80,7 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
 
   private boolean shouldParsePattern = false;
 
+  @TestOnly
   public TsFileInsertionDataContainer(
       final File tsFile, final PipePattern pattern, final long startTime, 
final long endTime)
       throws IOException {
@@ -122,15 +126,20 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
         deviceIsAlignedMap = readDeviceIsAlignedMap();
         memoryRequiredInBytes += 
PipeMemoryWeighUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
 
-        measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
+        // Filter devices that may overlap with pattern first
+        // to avoid reading all time-series of all devices.
+        final Set<IDeviceID> devices = 
filterDevicesByPattern(deviceIsAlignedMap.keySet());
+
+        measurementDataTypeMap = readFilteredFullPathDataTypeMap(devices);
         memoryRequiredInBytes += 
PipeMemoryWeighUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
 
-        deviceMeasurementsMap = 
tsFileSequenceReader.getDeviceMeasurementsMap();
+        deviceMeasurementsMap = readFilteredDeviceMeasurementsMap(devices);
         memoryRequiredInBytes +=
             
PipeMemoryWeighUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
       }
       allocatedMemoryBlock = 
PipeResourceManager.memory().forceAllocate(memoryRequiredInBytes);
 
+      // Filter again to get the final deviceMeasurementsMap that exactly 
matches the pattern.
       deviceMeasurementsMapIterator =
           
filterDeviceMeasurementsMapByPattern(deviceMeasurementsMap).entrySet().iterator();
 
@@ -198,6 +207,67 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
     return deviceIsAlignedResultMap;
   }
 
+  private Set<IDeviceID> filterDevicesByPattern(final Set<IDeviceID> devices) {
+    if (Objects.isNull(pattern) || pattern.isRoot()) {
+      return devices;
+    }
+
+    final Set<IDeviceID> filteredDevices = new HashSet<>();
+    for (final IDeviceID device : devices) {
+      final String deviceId = ((PlainDeviceID) device).toStringID();
+      if (pattern.coversDevice(deviceId) || 
pattern.mayOverlapWithDevice(deviceId)) {
+        filteredDevices.add(device);
+      }
+    }
+    return filteredDevices;
+  }
+
+  /**
+   * This method is similar to {@link 
TsFileSequenceReader#getFullPathDataTypeMap()}, but only reads
+   * the given devices.
+   */
+  private Map<String, TSDataType> readFilteredFullPathDataTypeMap(final 
Set<IDeviceID> devices)
+      throws IOException {
+    final Map<String, TSDataType> result = new HashMap<>();
+
+    for (IDeviceID device : devices) {
+      tsFileSequenceReader
+          .readDeviceMetadata(device)
+          .values()
+          .forEach(
+              timeseriesMetadata ->
+                  result.put(
+                      ((PlainDeviceID) device).toStringID()
+                          + "."
+                          + timeseriesMetadata.getMeasurementId(),
+                      timeseriesMetadata.getTsDataType()));
+    }
+
+    return result;
+  }
+
+  /**
+   * This method is similar to {@link 
TsFileSequenceReader#getDeviceMeasurementsMap()}, but only
+   * reads the given devices.
+   */
+  private Map<IDeviceID, List<String>> readFilteredDeviceMeasurementsMap(
+      final Set<IDeviceID> devices) throws IOException {
+    final Map<IDeviceID, List<String>> result = new HashMap<>();
+
+    for (IDeviceID device : devices) {
+      tsFileSequenceReader
+          .readDeviceMetadata(device)
+          .values()
+          .forEach(
+              timeseriesMetadata ->
+                  result
+                      .computeIfAbsent(device, d -> new ArrayList<>())
+                      .add(timeseriesMetadata.getMeasurementId()));
+    }
+
+    return result;
+  }
+
   /**
    * @return {@link TabletInsertionEvent} in a streaming way
    */

Reply via email to