This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch update_last_cache_in_load
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/update_last_cache_in_load by
this push:
new 566c2b8984d add memory control when caching last values
566c2b8984d is described below
commit 566c2b8984d195b847a105aee016c388124f1844
Author: Tian Jiang <[email protected]>
AuthorDate: Thu May 29 11:40:19 2025 +0800
add memory control when caching last values
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++
.../dataregion/utils/TsFileResourceUtils.java | 37 ++++++++++--
.../db/storageengine/load/LoadTsFileManager.java | 67 ++++++++++++++++------
4 files changed, 95 insertions(+), 25 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8651049d899..b91cb14f322 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1158,6 +1158,8 @@ public class IoTDBConfig {
*/
private boolean cacheLastValuesForLoad = true;
+ private long cacheLastValuesMemoryBudgetInByte = 4 * 1024 * 1024;
+
IoTDBConfig() {}
public int getMaxLogEntriesNumPerBatch() {
@@ -4096,4 +4098,12 @@ public class IoTDBConfig {
public void setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) {
this.cacheLastValuesForLoad = cacheLastValuesForLoad;
}
+
+ public long getCacheLastValuesMemoryBudgetInByte() {
+ return cacheLastValuesMemoryBudgetInByte;
+ }
+
+ public void setCacheLastValuesMemoryBudgetInByte(long
cacheLastValuesMemoryBudgetInByte) {
+ this.cacheLastValuesMemoryBudgetInByte = cacheLastValuesMemoryBudgetInByte;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e1a9d155c98..3e32fad522b 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2272,6 +2272,12 @@ public class IoTDBDescriptor {
Boolean.parseBoolean(
properties.getProperty(
"cache_last_values_for_load",
String.valueOf(conf.isCacheLastValuesForLoad()))));
+
+ conf.setCacheLastValuesMemoryBudgetInByte(
+ Long.parseLong(
+ properties.getProperty(
+ "cache_last_values_memory_budget_in_byte",
+ String.valueOf(conf.getCacheLastValuesMemoryBudgetInByte()))));
}
private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws
IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
index 16e7939069f..3672f816414 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
@@ -49,6 +49,7 @@ import org.apache.tsfile.read.reader.page.PageReader;
import org.apache.tsfile.read.reader.page.TimePageReader;
import org.apache.tsfile.read.reader.page.ValuePageReader;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
@@ -430,12 +431,13 @@ public class TsFileResourceUtils {
: tsFileResource.getTimeIndex();
Map<IDeviceID, List<Pair<String, TimeValuePair>>> deviceLastValues =
tsFileResource.getLastValues();
+ long lastValueMemCost = 0;
if (cacheLastValue && deviceLastValues == null) {
deviceLastValues = new HashMap<>(device2Metadata.size());
}
for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
device2Metadata.entrySet()) {
List<Pair<String, TimeValuePair>> seriesLastValues = null;
- if (cacheLastValue) {
+ if (deviceLastValues != null) {
seriesLastValues = new ArrayList<>(entry.getValue().size());
}
@@ -443,7 +445,7 @@ public class TsFileResourceUtils {
newTimeIndex.updateStartTime(
entry.getKey(), timeseriesMetaData.getStatistics().getStartTime());
newTimeIndex.updateEndTime(entry.getKey(),
timeseriesMetaData.getStatistics().getEndTime());
- if (cacheLastValue) {
+ if (deviceLastValues != null) {
if (timeseriesMetaData.getTsDataType() != TSDataType.BLOB) {
TsPrimitiveType value;
value =
@@ -461,10 +463,33 @@ public class TsFileResourceUtils {
}
}
}
- if (cacheLastValue) {
- deviceLastValues
- .computeIfAbsent(entry.getKey(), deviceID -> new ArrayList<>())
- .addAll(seriesLastValues);
+ if (deviceLastValues != null) {
+ lastValueMemCost += entry.getKey().ramBytesUsed();
+ for (Pair<String, TimeValuePair> lastValue : seriesLastValues) {
+ if (lastValue == null) {
+ continue;
+ }
+ // pair
+ lastValueMemCost += RamUsageEstimator.shallowSizeOf(lastValue);
+ // measurement name
+ lastValueMemCost += RamUsageEstimator.sizeOf(lastValue.left);
+ TimeValuePair right = lastValue.getRight();
+ lastValueMemCost +=
+ right != null ? right.getSize() :
RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+ }
+ // ArrayList
+ lastValueMemCost +=
+ (long) seriesLastValues.size() *
RamUsageEstimator.NUM_BYTES_OBJECT_REF
+ + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+ + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
+ lastValueMemCost += RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
+ if (lastValueMemCost <= config.getCacheLastValuesMemoryBudgetInByte())
{
+ deviceLastValues
+ .computeIfAbsent(entry.getKey(), deviceID -> new ArrayList<>())
+ .addAll(seriesLastValues);
+ } else {
+ deviceLastValues = null;
+ }
}
}
tsFileResource.setTimeIndex(newTimeIndex);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 76e315b7050..87db5cd25c1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -60,6 +60,7 @@ import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
@@ -78,6 +79,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -543,33 +545,60 @@ public class LoadTsFileManager {
if
(IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad()) {
deviceLastValues = new HashMap<>();
}
+ AtomicLong lastValuesMemCost = new AtomicLong(0);
+
for (final ChunkGroupMetadata chunkGroupMetadata :
writer.getChunkGroupMetadataList()) {
final IDeviceID device = chunkGroupMetadata.getDevice();
for (final ChunkMetadata chunkMetadata :
chunkGroupMetadata.getChunkMetadataList()) {
tsFileResource.updateStartTime(device, chunkMetadata.getStartTime());
tsFileResource.updateEndTime(device, chunkMetadata.getEndTime());
if (deviceLastValues != null) {
- deviceLastValues
- .computeIfAbsent(device, d -> new HashMap<>())
- .compute(
- chunkMetadata.getMeasurementUid(),
- (m, oldPair) -> {
- if (oldPair != null && oldPair.getTimestamp() >
chunkMetadata.getEndTime()) {
- return oldPair;
- }
- TsPrimitiveType lastValue =
- chunkMetadata.getStatistics() != null
- && chunkMetadata.getDataType() !=
TSDataType.BLOB
- ? TsPrimitiveType.getByType(
- chunkMetadata.getDataType() ==
TSDataType.VECTOR
- ? TSDataType.INT64
- : chunkMetadata.getDataType(),
- chunkMetadata.getStatistics().getLastValue())
- : null;
- return lastValue != null
+ Map<String, TimeValuePair> deviceMap =
+ deviceLastValues.computeIfAbsent(
+ device,
+ d -> {
+ Map<String, TimeValuePair> map = new HashMap<>();
+
lastValuesMemCost.addAndGet(RamUsageEstimator.shallowSizeOf(map));
+ lastValuesMemCost.addAndGet(device.ramBytesUsed());
+ return map;
+ });
+ int prevSize = deviceMap.size();
+ deviceMap.compute(
+ chunkMetadata.getMeasurementUid(),
+ (m, oldPair) -> {
+ if (oldPair != null && oldPair.getTimestamp() >
chunkMetadata.getEndTime()) {
+ return oldPair;
+ }
+ TsPrimitiveType lastValue =
+ chunkMetadata.getStatistics() != null
+ && chunkMetadata.getDataType() != TSDataType.BLOB
+ ? TsPrimitiveType.getByType(
+ chunkMetadata.getDataType() == TSDataType.VECTOR
+ ? TSDataType.INT64
+ : chunkMetadata.getDataType(),
+ chunkMetadata.getStatistics().getLastValue())
+ : null;
+ TimeValuePair timeValuePair =
+ lastValue != null
? new TimeValuePair(chunkMetadata.getEndTime(),
lastValue)
: null;
- });
+ if (oldPair != null) {
+ lastValuesMemCost.addAndGet(-oldPair.getSize());
+ }
+ if (timeValuePair != null) {
+ lastValuesMemCost.addAndGet(timeValuePair.getSize());
+ }
+ return timeValuePair;
+ });
+ int afterSize = deviceMap.size();
+ lastValuesMemCost.addAndGet(
+ (afterSize - prevSize) *
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
+ if (lastValuesMemCost.get()
+ > IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCacheLastValuesMemoryBudgetInByte()) {
+ deviceLastValues = null;
+ }
}
}
}