This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch addMemoryControlForModEntriesInQuery-1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/addMemoryControlForModEntriesInQuery-1.3 by this push:
new b614a8db74d Add memory control for mod entries in query
b614a8db74d is described below
commit b614a8db74de488bff86835df59b1683e8733f95
Author: shuwenwei <[email protected]>
AuthorDate: Thu Aug 14 17:12:12 2025 +0800
Add memory control for mod entries in query
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 37 ++++++-----
.../execution/MemoryEstimationHelper.java | 16 +++++
.../fragment/FragmentInstanceContext.java | 76 ++++++++++++++++++++++
.../execution/fragment/QueryContext.java | 63 ++++++++++--------
.../dataregion/modification/Deletion.java | 30 +++++++--
.../dataregion/modification/Modification.java | 4 +-
.../filescan/impl/ClosedFileScanHandleImpl.java | 16 ++++-
.../conf/iotdb-system.properties.template | 7 ++
.../apache/iotdb/commons/path/PathPatternNode.java | 19 +++++-
.../apache/iotdb/commons/path/PatternTreeMap.java | 12 +++-
pom.xml | 2 +-
12 files changed, 237 insertions(+), 56 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index fd52e61603f..1a3f87a1a83 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -454,6 +454,9 @@ public class IoTDBConfig {
/** The buffer for sort operation */
private long sortBufferSize = 32 * 1024 * 1024L;
+ /** Mods cache size limit per fi */
+ private long modsCacheSizeLimitPerFI = 32 * 1024 * 1024;
+
/**
* The strategy of inner space compaction task. There are just one inner
space compaction strategy
* SIZE_TIRED_COMPACTION:
@@ -4235,6 +4238,14 @@ public class IoTDBConfig {
return sortBufferSize;
}
+ public void setModsCacheSizeLimitPerFI(long modsCacheSizeLimitPerFI) {
+ this.modsCacheSizeLimitPerFI = modsCacheSizeLimitPerFI;
+ }
+
+ public long getModsCacheSizeLimitPerFI() {
+ return modsCacheSizeLimitPerFI;
+ }
+
public void setSortTmpDir(String sortTmpDir) {
this.sortTmpDir = sortTmpDir;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7c61bab1a1f..042e7969bf7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -90,6 +90,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Set;
+import java.util.function.LongConsumer;
import java.util.regex.Pattern;
public class IoTDBDescriptor {
@@ -1081,7 +1082,10 @@ public class IoTDBDescriptor {
// The buffer for sort operator to calculate
- loadSortBuffer(properties);
+ loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes",
conf::setSortBufferSize);
+
+ loadFixedSizeLimitForQuery(
+ properties, "mods_cache_size_limit_per_fi_in_bytes",
conf::setModsCacheSizeLimitPerFI);
// tmp filePath for sort operator
conf.setSortTmpDir(properties.getProperty("sort_tmp_dir",
conf.getSortTmpDir()));
@@ -1106,21 +1110,17 @@ public class IoTDBDescriptor {
loadTrustedUriPattern(properties);
}
- private void loadSortBuffer(TrimProperties properties) {
- long defaultValue = calculateDefaultSortBufferSize();
- long sortBufferSize =
- Long.parseLong(
- properties.getProperty("sort_buffer_size_in_bytes",
Long.toString(defaultValue)));
- if (sortBufferSize <= 0) {
- sortBufferSize = defaultValue;
- }
- // The buffer for sort operator to calculate
- conf.setSortBufferSize(sortBufferSize);
- }
-
- public static long calculateDefaultSortBufferSize() {
- return Math.min(
- 32 * 1024 * 1024L, conf.getAllocateMemoryForOperators() /
conf.getQueryThreadCount() / 2);
+ private void loadFixedSizeLimitForQuery(
+ TrimProperties properties, String name, LongConsumer setFunction) {
+ long defaultValue =
+ Math.min(
+ 32 * 1024 * 1024L,
+ conf.getAllocateMemoryForOperators() / conf.getQueryThreadCount()
/ 2);
+ long size = Long.parseLong(properties.getProperty(name,
Long.toString(defaultValue)));
+ if (size <= 0) {
+ size = defaultValue;
+ }
+ setFunction.accept(size);
}
private void reloadConsensusProps(TrimProperties properties) throws
IOException {
@@ -2058,7 +2058,10 @@ public class IoTDBDescriptor {
ConfigurationFileUtils.getConfigurationDefaultValue("tvlist_sort_threshold"))));
// sort_buffer_size_in_bytes
- loadSortBuffer(properties);
+ loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes",
conf::setSortBufferSize);
+
+ loadFixedSizeLimitForQuery(
+ properties, "mods_cache_size_limit_per_fi_in_bytes",
conf::setModsCacheSizeLimitPerFI);
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
index 9da6b85e9f6..ba6660f3d6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Accountable;
import org.apache.tsfile.utils.RamUsageEstimator;
@@ -47,6 +48,8 @@ public class MemoryEstimationHelper {
RamUsageEstimator.shallowSizeOfInstance(ArrayList.class);
private static final long INTEGER_INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(Integer.class);
+ public static final long TIME_RANGE_INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TimeRange.class);
private MemoryEstimationHelper() {
// hide the constructor
@@ -94,6 +97,19 @@ public class MemoryEstimationHelper {
return totalSize;
}
+ public static long getEstimatedSizeOfMeasurementPathNodes(
+ @Nullable final PartialPath partialPath) {
+ if (partialPath == null) {
+ return 0;
+ }
+ long totalSize = MEASUREMENT_PATH_INSTANCE_SIZE;
+ String[] nodes = partialPath.getNodes();
+ if (nodes != null && nodes.length > 0) {
+ totalSize +=
Arrays.stream(nodes).mapToLong(RamUsageEstimator::sizeOf).sum();
+ }
+ return totalSize;
+ }
+
// This method should only be called if the content in the current
PartialPath comes from other
// structures whose memory cost have already been calculated.
public static long getEstimatedSizeOfCopiedPartialPath(@Nullable final
PartialPath partialPath) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index f6afb5b71b6..c5dff1f9666 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -23,7 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.DeviceContext;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
@@ -38,17 +41,21 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import
org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegionScan;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.filter.basic.Filter;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +77,7 @@ import static
org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.EMP
public class FragmentInstanceContext extends QueryContext {
private static final Logger LOGGER =
LoggerFactory.getLogger(FragmentInstanceContext.class);
+ private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
private static final long END_TIME_INITIAL_VALUE = -1L;
// wait over 5s for driver to close is abnormal
private static final long LONG_WAIT_DURATION = 5_000_000_000L;
@@ -84,6 +92,8 @@ public class FragmentInstanceContext extends QueryContext {
// it will only be used once, after sharedQueryDataSource being inited, it
will be set to null
protected List<PartialPath> sourcePaths;
+
+ private boolean singleSourcePath = false;
// Used for region scan.
private Map<IDeviceID, DeviceContext> devicePathsToContext;
@@ -307,6 +317,64 @@ public class FragmentInstanceContext extends QueryContext {
lastExecutionStartTime.set(now);
}
+ @Override
+ protected boolean checkIfModificationExists(TsFileResource tsFileResource) {
+ if (isSingleSourcePath()) {
+ return tsFileResource.getModFile().exists();
+ }
+ if (nonExistentModFiles.contains(tsFileResource.getTsFileID())) {
+ return false;
+ }
+
+ ModificationFile modFile = tsFileResource.getModFile();
+ if (!modFile.exists()) {
+ if (nonExistentModFiles.add(tsFileResource.getTsFileID())
+ && memoryReservationManager != null) {
+
memoryReservationManager.reserveMemoryCumulatively(RamUsageEstimator.NUM_BYTES_OBJECT_REF);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
getAllModifications(
+ TsFileResource resource) {
+ if (isSingleSourcePath() || memoryReservationManager == null) {
+ return loadAllModificationsFromDisk(resource);
+ }
+
+ AtomicReference<PatternTreeMap<Modification,
PatternTreeMapFactory.ModsSerializer>>
+ atomicReference = new AtomicReference<>();
+ PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
cachedResult =
+ fileModCache.computeIfAbsent(
+ resource.getTsFileID(),
+ k -> {
+ PatternTreeMap<Modification,
PatternTreeMapFactory.ModsSerializer> allMods =
+ loadAllModificationsFromDisk(resource);
+ atomicReference.set(allMods);
+ if (cachedModEntriesSize.get() >=
config.getModsCacheSizeLimitPerFI()) {
+ return null;
+ }
+ long memCost =
+ RamUsageEstimator.sizeOfObject(allMods)
+ +
RamUsageEstimator.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY;
+ long alreadyUsedMemoryForCachedModEntries =
cachedModEntriesSize.get();
+ while (alreadyUsedMemoryForCachedModEntries + memCost
+ < config.getModsCacheSizeLimitPerFI()) {
+ if (cachedModEntriesSize.compareAndSet(
+ alreadyUsedMemoryForCachedModEntries,
+ alreadyUsedMemoryForCachedModEntries + memCost)) {
+ memoryReservationManager.reserveMemoryCumulatively(memCost);
+ return allMods;
+ }
+ alreadyUsedMemoryForCachedModEntries =
cachedModEntriesSize.get();
+ }
+ return null;
+ });
+ return cachedResult == null ? atomicReference.get() : cachedResult;
+ }
+
// the state change listener is added here in a separate initialize() method
// instead of the constructor to prevent leaking the "this" reference to
// another thread, which will cause unsafe publication of this instance.
@@ -450,6 +518,9 @@ public class FragmentInstanceContext extends QueryContext {
public void setSourcePaths(List<PartialPath> sourcePaths) {
this.sourcePaths = sourcePaths;
+ if (sourcePaths != null && sourcePaths.size() == 1) {
+ singleSourcePath = true;
+ }
}
public void setDevicePathsToContext(Map<IDeviceID, DeviceContext>
devicePathsToContext) {
@@ -754,6 +825,7 @@ public class FragmentInstanceContext extends QueryContext {
// release TVList/AlignedTVList owned by current query
releaseTVListOwnedByQuery();
+ fileModCache = null;
dataRegion = null;
globalTimeFilter = null;
sharedQueryDataSource = null;
@@ -928,4 +1000,8 @@ public class FragmentInstanceContext extends QueryContext {
public boolean ignoreNotExistsDevice() {
return ignoreNotExistsDevice;
}
+
+ public boolean isSingleSourcePath() {
+ return singleSourcePath;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index f0a210e1edf..332bd915d3d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -41,6 +41,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLong;
/** QueryContext contains the shared information with in a query. */
public class QueryContext {
@@ -48,13 +49,15 @@ public class QueryContext {
private QueryStatistics queryStatistics = new QueryStatistics();
/**
- * The key is the path of a ModificationFile and the value is all
Modifications in this file. We
- * use this field because each call of Modification.getModifications()
return a copy of the
- * Modifications, and we do not want it to create multiple copies within a
query.
+ * The key is TsFileID and the value is all Modifications in this file. We
use this field because
+ * each call of Modification.getModifications() return a copy of the
Modifications, and we do not
+ * want it to create multiple copies within a query.
*/
- private final Map<String, PatternTreeMap<Modification, ModsSerializer>>
fileModCache =
+ protected Map<TsFileID, PatternTreeMap<Modification, ModsSerializer>>
fileModCache =
new ConcurrentHashMap<>();
+ protected AtomicLong cachedModEntriesSize = new AtomicLong(0);
+
protected long queryId;
private boolean debug;
@@ -64,7 +67,7 @@ public class QueryContext {
private volatile boolean isInterrupted = false;
- private final Set<TsFileID> nonExistentModFiles = new
CopyOnWriteArraySet<>();
+ protected final Set<TsFileID> nonExistentModFiles = new
CopyOnWriteArraySet<>();
// referenced TVLists for the query
protected final Set<TVList> tvListSet = new HashSet<>();
@@ -96,18 +99,21 @@ public class QueryContext {
return true;
}
- private PatternTreeMap<Modification, ModsSerializer> getAllModifications(
- ModificationFile modFile) {
+ protected PatternTreeMap<Modification, ModsSerializer> getAllModifications(
+ TsFileResource resource) {
return fileModCache.computeIfAbsent(
- modFile.getFilePath(),
- k -> {
- PatternTreeMap<Modification, ModsSerializer> modifications =
- PatternTreeMapFactory.getModsPatternTreeMap();
- for (Modification modification : modFile.getModificationsIter()) {
- modifications.append(modification.getPath(), modification);
- }
- return modifications;
- });
+ resource.getTsFileID(), k -> loadAllModificationsFromDisk(resource));
+ }
+
+ public PatternTreeMap<Modification, ModsSerializer>
loadAllModificationsFromDisk(
+ TsFileResource resource) {
+ PatternTreeMap<Modification, ModsSerializer> modifications =
+ PatternTreeMapFactory.getModsPatternTreeMap();
+ Iterable<Modification> modEntryIterator =
resource.getModFile().getModificationsIter();
+ for (Modification modification : modEntryIterator) {
+ modifications.append(modification.getPath(), modification);
+ }
+ return modifications;
}
public List<Modification> getPathModifications(
@@ -119,20 +125,16 @@ public class QueryContext {
}
return ModificationFile.sortAndMerge(
- getAllModifications(tsFileResource.getModFile())
- .getOverlapped(new PartialPath(deviceID, measurement)));
+ getAllModifications(tsFileResource).getOverlapped(new
PartialPath(deviceID, measurement)));
}
- public List<Modification> getPathModifications(TsFileResource
tsFileResource, IDeviceID deviceID)
+ public List<Modification> getPathModifications(
+ PatternTreeMap<Modification, ModsSerializer> fileMods, IDeviceID
deviceID)
throws IllegalPathException {
- // if the mods file does not exist, do not add it to the cache
- if (!checkIfModificationExists(tsFileResource)) {
+ if (fileMods == null) {
return Collections.emptyList();
}
-
- return ModificationFile.sortAndMerge(
- getAllModifications(tsFileResource.getModFile())
- .getDeviceOverlapped(new PartialPath(deviceID)));
+ return ModificationFile.sortAndMerge(fileMods.getDeviceOverlapped(new
PartialPath(deviceID)));
}
/**
@@ -145,8 +147,15 @@ public class QueryContext {
return Collections.emptyList();
}
- return ModificationFile.sortAndMerge(
- getAllModifications(tsFileResource.getModFile()).getOverlapped(path));
+ return getPathModifications(getAllModifications(tsFileResource), path);
+ }
+
+ public List<Modification> getPathModifications(
+ PatternTreeMap<Modification, ModsSerializer> fileMods, PartialPath path)
{
+ if (fileMods == null) {
+ return Collections.emptyList();
+ }
+ return ModificationFile.sortAndMerge(fileMods.getOverlapped(path));
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
index 5ab469fde10..07f8aecbfc0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
@@ -21,9 +21,12 @@ package
org.apache.iotdb.db.storageengine.dataregion.modification;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
-import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataInputStream;
@@ -34,6 +37,8 @@ import java.util.Objects;
/** Deletion is a delete operation on a timeseries. */
public class Deletion extends Modification implements Cloneable {
+ private static final long SHALLOW_SIZE =
RamUsageEstimator.shallowSizeOfInstance(Deletion.class);
+
/** data within the interval [startTime, endTime] are to be deleted. */
private TimeRange timeRange;
@@ -117,10 +122,18 @@ public class Deletion extends Modification implements
Cloneable {
long startTime = stream.readLong();
long endTime = stream.readLong();
return new Deletion(
-
DataNodeDevicePathCache.getInstance().getPartialPath(ReadWriteIOUtils.readString(stream)),
- 0,
- startTime,
- endTime);
+ getMeasurementPath(ReadWriteIOUtils.readString(stream)), 0, startTime,
endTime);
+ }
+
+ private static PartialPath getMeasurementPath(String path) throws
IllegalPathException {
+ // In this place, we can be sure that the path pattern here has been
checked by antlr before, so
+ // when conditions permit, a lighter split method can be used here.
+ if (path.contains(TsFileConstant.BACK_QUOTE_STRING)) {
+ return new PartialPath(PathUtils.splitPathToDetachedNodes(path));
+ } else {
+ String[] nodes = path.split(TsFileConstant.PATH_SEPARATER_NO_REGEX);
+ return new PartialPath(nodes);
+ }
}
public long getSerializedSize() {
@@ -164,4 +177,11 @@ public class Deletion extends Modification implements
Cloneable {
public Deletion clone() {
return new Deletion(getPath(), getFileOffset(), getStartTime(),
getEndTime());
}
+
+ @Override
+ public long ramBytesUsed() {
+ return SHALLOW_SIZE
+ + MemoryEstimationHelper.TIME_RANGE_INSTANCE_SIZE
+ + MemoryEstimationHelper.getEstimatedSizeOfMeasurementPathNodes(path);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Modification.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Modification.java
index 4ca1f11b6d5..988c1be9093 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Modification.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Modification.java
@@ -21,10 +21,12 @@ package
org.apache.iotdb.db.storageengine.dataregion.modification;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.tsfile.utils.Accountable;
+
import java.util.Objects;
/** Modification represents an UPDATE or DELETE operation on a certain
timeseries. */
-public abstract class Modification {
+public abstract class Modification implements Accountable {
protected Type type;
protected PartialPath path;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
index 9236924d192..055b8517b48 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
@@ -36,6 +38,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeI
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.utils.TsFileDeviceStartEndTimeIterator;
import org.apache.iotdb.db.utils.ModificationUtils;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
@@ -61,6 +64,7 @@ public class ClosedFileScanHandleImpl implements
IFileScanHandle {
private final TsFileResource tsFileResource;
private final QueryContext queryContext;
+ private PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
curFileMods = null;
// Used to cache the modifications of each timeseries
private final Map<IDeviceID, Map<String, List<TimeRange>>>
deviceToModifications;
@@ -81,7 +85,11 @@ public class ClosedFileScanHandleImpl implements
IFileScanHandle {
@Override
public boolean isDeviceTimeDeleted(IDeviceID deviceID, long timestamp)
throws IllegalPathException {
- List<Modification> modifications =
queryContext.getPathModifications(tsFileResource, deviceID);
+ curFileMods =
+ curFileMods != null
+ ? curFileMods
+ : queryContext.loadAllModificationsFromDisk(tsFileResource);
+ List<Modification> modifications =
queryContext.getPathModifications(curFileMods, deviceID);
List<TimeRange> timeRangeList =
modifications.stream()
.filter(Deletion.class::isInstance)
@@ -100,8 +108,12 @@ public class ClosedFileScanHandleImpl implements
IFileScanHandle {
return ModificationUtils.isPointDeleted(timestamp,
modificationTimeRange.get(timeSeriesName));
}
+ curFileMods =
+ curFileMods != null
+ ? curFileMods
+ : queryContext.loadAllModificationsFromDisk(tsFileResource);
List<Modification> modifications =
- queryContext.getPathModifications(tsFileResource, deviceID,
timeSeriesName);
+ queryContext.getPathModifications(curFileMods, new
PartialPath(deviceID, timeSeriesName));
List<TimeRange> timeRangeList =
modifications.stream()
.filter(Deletion.class::isInstance)
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 70bd7e15113..1824ab3e750 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1067,6 +1067,13 @@ batch_size=100000
# Datatype: long
sort_buffer_size_in_bytes=0
+# The maximum mod entries size that each FragmentInstance can cache.
+# if mods_cache_size_limit_per_fi_in_bytes <= 0, default value will be used,
default value = min(32MB, memory for query operators / query_thread_count / 2)
+# if mods_cache_size_limit_per_fi_in_bytes > 0, the specified value will be
used.
+# effectiveMode: hot_reload
+# Datatype: long
+mods_cache_size_limit_per_fi_in_bytes=0
+
# The threshold of operator count in the result set of EXPLAIN ANALYZE, if the
number of operator in the result set is larger than this threshold, operator
will be merged.
# effectiveMode: hot_reload
# Datatype: int
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternNode.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternNode.java
index 80e48f2e287..89ae7444d19 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternNode.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternNode.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.commons.path;
+import org.apache.tsfile.utils.Accountable;
import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -40,8 +42,10 @@ import java.util.function.Supplier;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-public class PathPatternNode<V, VSerializer extends
PathPatternNode.Serializer<V>> {
-
+public class PathPatternNode<V, VSerializer extends
PathPatternNode.Serializer<V>>
+ implements Accountable {
+ private static final long SHALLOW_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(PathPatternNode.class);
private final String name;
private final Map<String, PathPatternNode<V, VSerializer>> children;
private Set<V> valueSet;
@@ -256,6 +260,17 @@ public class PathPatternNode<V, VSerializer extends
PathPatternNode.Serializer<V
return node;
}
+ @Override
+ public long ramBytesUsed() {
+ return SHALLOW_SIZE
+ + RamUsageEstimator.sizeOf(name)
+ + RamUsageEstimator.sizeOfHashSet(valueSet)
+ + RamUsageEstimator.sizeOfMapWithKnownShallowSize(
+ children,
+ RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
+ RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
+ }
+
/**
* Interface to support serialize and deserialize valueSet.
*
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java
index dc35cb5a4c4..4726283b93c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java
@@ -20,6 +20,9 @@ package org.apache.iotdb.commons.path;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.tsfile.utils.Accountable;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
import javax.annotation.concurrent.NotThreadSafe;
import java.util.ArrayList;
@@ -30,7 +33,9 @@ import java.util.function.BiConsumer;
import java.util.function.Supplier;
@NotThreadSafe
-public class PatternTreeMap<V, VSerializer extends
PathPatternNode.Serializer<V>> {
+public class PatternTreeMap<V, VSerializer extends
PathPatternNode.Serializer<V>>
+ implements Accountable {
+ private final long SHALLOW_SIZE =
RamUsageEstimator.shallowSizeOfInstance(PatternTreeMap.class);
private final PathPatternNode<V, VSerializer> root;
private final Supplier<? extends Set<V>> supplier;
private final BiConsumer<V, Set<V>> appendFunction;
@@ -246,4 +251,9 @@ public class PatternTreeMap<V, VSerializer extends
PathPatternNode.Serializer<V>
searchDeviceOverlapped(child, deviceNodes, pos + 1, resultSet);
}
}
+
+ @Override
+ public long ramBytesUsed() {
+ return SHALLOW_SIZE + RamUsageEstimator.sizeOfObject(root);
+ }
}
diff --git a/pom.xml b/pom.xml
index 58bd5e89bca..6d4d29c563f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -175,7 +175,7 @@
<thrift.version>0.14.1</thrift.version>
<xz.version>1.9</xz.version>
<zstd-jni.version>1.5.6-3</zstd-jni.version>
- <tsfile.version>1.1.2-250801-SNAPSHOT</tsfile.version>
+ <tsfile.version>1.1.2-250814-SNAPSHOT</tsfile.version>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim