This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch QueryImprove in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4b0c9843f2990062da101a8e1eec7979d577bb94 Author: JackieTien97 <[email protected]> AuthorDate: Fri Mar 17 14:28:35 2023 +0800 Filter unsatisfied TimePartition while preparing resource list --- .../apache/iotdb/db/engine/storagegroup/DataRegion.java | 5 +++-- .../db/engine/storagegroup/HashLastFlushTimeMap.java | 12 ++++++++++++ .../db/engine/storagegroup/IDTableLastFlushTimeMap.java | 6 ++++++ .../iotdb/db/engine/storagegroup/ILastFlushTimeMap.java | 3 +++ .../iotdb/db/engine/storagegroup/TsFileManager.java | 16 ++++++++++++++++ 5 files changed, 40 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 9fe0e3346f..1864b0425f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -1700,9 +1700,10 @@ public class DataRegion implements IDataRegionForQuery { List<PartialPath> pathList, String singleDeviceId, QueryContext context, Filter timeFilter) throws QueryProcessException { try { + List<Long> timePartitions = lastFlushTimeMap.getAllSatisfiedTimePartitions(singleDeviceId); List<TsFileResource> seqResources = getFileResourceListForQuery( - tsFileManager.getTsFileList(true), + tsFileManager.getTsFileList(timePartitions, true), upgradeSeqFileList, pathList, singleDeviceId, @@ -1711,7 +1712,7 @@ public class DataRegion implements IDataRegionForQuery { true); List<TsFileResource> unseqResources = getFileResourceListForQuery( - tsFileManager.getTsFileList(false), + tsFileManager.getTsFileList(timePartitions, false), upgradeUnseqFileList, pathList, singleDeviceId, diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java index 50d524a3e5..d34a52cdad 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java @@ -22,9 +22,11 @@ package org.apache.iotdb.db.engine.storagegroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class HashLastFlushTimeMap implements ILastFlushTimeMap { @@ -245,4 +247,14 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { } return 0; } + + @Override + public List<Long> getAllSatisfiedTimePartitions(String deviceId) { + return deviceId == null + ? new ArrayList<>(newlyFlushedPartitionLatestFlushedTimeForEachDevice.keySet()) + : newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet().stream() + .filter(entry -> entry.getValue().containsKey(deviceId)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java index 03dd87476a..3a56a2dd41 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.engine.storagegroup; import org.apache.iotdb.db.metadata.idtable.IDTable; import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -184,4 +185,9 @@ public class IDTableLastFlushTimeMap implements ILastFlushTimeMap { } return 0; } + + @Override + public List<Long> getAllSatisfiedTimePartitions(String deviceId) { + return new ArrayList<>(partitionSet); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java index f344b73f31..0da4674073 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.engine.storagegroup; +import java.util.List; import java.util.Map; /** This interface manages last time and flush time for sequence and unsequence determination */ @@ -70,4 +71,6 @@ public interface ILastFlushTimeMap { void removePartition(long partitionId); long getMemSize(long partitionId); + + List<Long> getAllSatisfiedTimePartitions(String deviceId); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java index 86fa5e742e..ffd38cbde2 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java @@ -85,6 +85,22 @@ public class TsFileManager { } } + public List<TsFileResource> getTsFileList(List<Long> timePartitions, boolean sequence) { + // the iteration of ConcurrentSkipListMap is not concurrent secure + // so we must add read lock here + readLock(); + try { + List<TsFileResource> allResources = new ArrayList<>(); + Map<Long, TsFileResourceList> chosenMap = sequence ? sequenceFiles : unsequenceFiles; + for (Long timePartition : timePartitions) { + allResources.addAll(chosenMap.get(timePartition).getArrayList()); + } + return allResources; + } finally { + readUnlock(); + } + } + public TsFileResourceList getOrCreateSequenceListByTimePartition(long timePartition) { writeLock("getOrCreateSequenceListByTimePartition"); try {
