This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f196cf20900 Pipe: Filter devices by pattern before reading device
metadata from TsFile (#12765)
f196cf20900 is described below
commit f196cf2090033ba6148c36db5a24411f933b3138
Author: Zikun Ma <[email protected]>
AuthorDate: Wed Jun 19 23:51:12 2024 +0800
Pipe: Filter devices by pattern before reading device metadata from TsFile
(#12765)
Currently we read the metadata of all devices and measurements when
constructing a TsFileInsertionDataContainer. This is both time-consuming and
memory-wasting if only a few devices match the pattern. This PR filters the
devices by pattern before reading the metadata of devices and measurements,
saving memory and I/O cost when there are many unmatched devices.
Note: this only works when TsFile metadata are not cached (due to high
memory usage of TsFile metadata cache), because cached metadata can not be
filtered so that pipes with arbitrary patterns can use it.
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../tsfile/TsFileInsertionDataContainer.java | 74 +++++++++++++++++++++-
1 file changed, 72 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 03e61cf2809..8030cce9324 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -50,11 +51,13 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.Set;
public class TsFileInsertionDataContainer implements AutoCloseable {
@@ -77,6 +80,7 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
private boolean shouldParsePattern = false;
+ @TestOnly
public TsFileInsertionDataContainer(
final File tsFile, final PipePattern pattern, final long startTime,
final long endTime)
throws IOException {
@@ -122,15 +126,20 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
deviceIsAlignedMap = readDeviceIsAlignedMap();
memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
- measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
+ // Filter devices that may overlap with pattern first
+ // to avoid reading all time-series of all devices.
+ final Set<IDeviceID> devices =
filterDevicesByPattern(deviceIsAlignedMap.keySet());
+
+ measurementDataTypeMap = readFilteredFullPathDataTypeMap(devices);
memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
- deviceMeasurementsMap =
tsFileSequenceReader.getDeviceMeasurementsMap();
+ deviceMeasurementsMap = readFilteredDeviceMeasurementsMap(devices);
memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
}
allocatedMemoryBlock =
PipeResourceManager.memory().forceAllocate(memoryRequiredInBytes);
+ // Filter again to get the final deviceMeasurementsMap that exactly
matches the pattern.
deviceMeasurementsMapIterator =
filterDeviceMeasurementsMapByPattern(deviceMeasurementsMap).entrySet().iterator();
@@ -198,6 +207,67 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
return deviceIsAlignedResultMap;
}
+ private Set<IDeviceID> filterDevicesByPattern(final Set<IDeviceID> devices) {
+ if (Objects.isNull(pattern) || pattern.isRoot()) {
+ return devices;
+ }
+
+ final Set<IDeviceID> filteredDevices = new HashSet<>();
+ for (final IDeviceID device : devices) {
+ final String deviceId = ((PlainDeviceID) device).toStringID();
+ if (pattern.coversDevice(deviceId) ||
pattern.mayOverlapWithDevice(deviceId)) {
+ filteredDevices.add(device);
+ }
+ }
+ return filteredDevices;
+ }
+
+ /**
+ * This method is similar to {@link
TsFileSequenceReader#getFullPathDataTypeMap()}, but only reads
+ * the given devices.
+ */
+ private Map<String, TSDataType> readFilteredFullPathDataTypeMap(final
Set<IDeviceID> devices)
+ throws IOException {
+ final Map<String, TSDataType> result = new HashMap<>();
+
+ for (IDeviceID device : devices) {
+ tsFileSequenceReader
+ .readDeviceMetadata(device)
+ .values()
+ .forEach(
+ timeseriesMetadata ->
+ result.put(
+ ((PlainDeviceID) device).toStringID()
+ + "."
+ + timeseriesMetadata.getMeasurementId(),
+ timeseriesMetadata.getTsDataType()));
+ }
+
+ return result;
+ }
+
+ /**
+ * This method is similar to {@link
TsFileSequenceReader#getDeviceMeasurementsMap()}, but only
+ * reads the given devices.
+ */
+ private Map<IDeviceID, List<String>> readFilteredDeviceMeasurementsMap(
+ final Set<IDeviceID> devices) throws IOException {
+ final Map<IDeviceID, List<String>> result = new HashMap<>();
+
+ for (IDeviceID device : devices) {
+ tsFileSequenceReader
+ .readDeviceMetadata(device)
+ .values()
+ .forEach(
+ timeseriesMetadata ->
+ result
+ .computeIfAbsent(device, d -> new ArrayList<>())
+ .add(timeseriesMetadata.getMeasurementId()));
+ }
+
+ return result;
+ }
+
/**
* @return {@link TabletInsertionEvent} in a streaming way
*/