This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch update_last_cache_in_load
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/update_last_cache_in_load by 
this push:
     new 566c2b8984d add memory control when caching last values
566c2b8984d is described below

commit 566c2b8984d195b847a105aee016c388124f1844
Author: Tian Jiang <[email protected]>
AuthorDate: Thu May 29 11:40:19 2025 +0800

    add memory control when caching last values
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 ++
 .../dataregion/utils/TsFileResourceUtils.java      | 37 ++++++++++--
 .../db/storageengine/load/LoadTsFileManager.java   | 67 ++++++++++++++++------
 4 files changed, 95 insertions(+), 25 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8651049d899..b91cb14f322 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1158,6 +1158,8 @@ public class IoTDBConfig {
    */
   private boolean cacheLastValuesForLoad = true;
 
+  private long cacheLastValuesMemoryBudgetInByte = 4 * 1024 * 1024;
+
   IoTDBConfig() {}
 
   public int getMaxLogEntriesNumPerBatch() {
@@ -4096,4 +4098,12 @@ public class IoTDBConfig {
   public void setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) {
     this.cacheLastValuesForLoad = cacheLastValuesForLoad;
   }
+
+  public long getCacheLastValuesMemoryBudgetInByte() {
+    return cacheLastValuesMemoryBudgetInByte;
+  }
+
+  public void setCacheLastValuesMemoryBudgetInByte(long 
cacheLastValuesMemoryBudgetInByte) {
+    this.cacheLastValuesMemoryBudgetInByte = cacheLastValuesMemoryBudgetInByte;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e1a9d155c98..3e32fad522b 100755
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2272,6 +2272,12 @@ public class IoTDBDescriptor {
         Boolean.parseBoolean(
             properties.getProperty(
                 "cache_last_values_for_load", 
String.valueOf(conf.isCacheLastValuesForLoad()))));
+
+    conf.setCacheLastValuesMemoryBudgetInByte(
+        Long.parseLong(
+            properties.getProperty(
+                "cache_last_values_memory_budget_in_byte",
+                String.valueOf(conf.getCacheLastValuesMemoryBudgetInByte()))));
   }
 
   private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws 
IOException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
index 16e7939069f..3672f816414 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
@@ -49,6 +49,7 @@ import org.apache.tsfile.read.reader.page.PageReader;
 import org.apache.tsfile.read.reader.page.TimePageReader;
 import org.apache.tsfile.read.reader.page.ValuePageReader;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.TsPrimitiveType;
 import org.apache.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
@@ -430,12 +431,13 @@ public class TsFileResourceUtils {
             : tsFileResource.getTimeIndex();
     Map<IDeviceID, List<Pair<String, TimeValuePair>>> deviceLastValues =
         tsFileResource.getLastValues();
+    long lastValueMemCost = 0;
     if (cacheLastValue && deviceLastValues == null) {
       deviceLastValues = new HashMap<>(device2Metadata.size());
     }
     for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry : 
device2Metadata.entrySet()) {
       List<Pair<String, TimeValuePair>> seriesLastValues = null;
-      if (cacheLastValue) {
+      if (deviceLastValues != null) {
         seriesLastValues = new ArrayList<>(entry.getValue().size());
       }
 
@@ -443,7 +445,7 @@ public class TsFileResourceUtils {
         newTimeIndex.updateStartTime(
             entry.getKey(), timeseriesMetaData.getStatistics().getStartTime());
         newTimeIndex.updateEndTime(entry.getKey(), 
timeseriesMetaData.getStatistics().getEndTime());
-        if (cacheLastValue) {
+        if (deviceLastValues != null) {
           if (timeseriesMetaData.getTsDataType() != TSDataType.BLOB) {
             TsPrimitiveType value;
             value =
@@ -461,10 +463,33 @@ public class TsFileResourceUtils {
           }
         }
       }
-      if (cacheLastValue) {
-        deviceLastValues
-            .computeIfAbsent(entry.getKey(), deviceID -> new ArrayList<>())
-            .addAll(seriesLastValues);
+      if (deviceLastValues != null) {
+        lastValueMemCost += entry.getKey().ramBytesUsed();
+        for (Pair<String, TimeValuePair> lastValue : seriesLastValues) {
+          if (lastValue == null) {
+            continue;
+          }
+          // pair
+          lastValueMemCost += RamUsageEstimator.shallowSizeOf(lastValue);
+          // measurement name
+          lastValueMemCost += RamUsageEstimator.sizeOf(lastValue.left);
+          TimeValuePair right = lastValue.getRight();
+          lastValueMemCost +=
+              right != null ? right.getSize() : 
RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+        }
+        // ArrayList
+        lastValueMemCost +=
+            (long) seriesLastValues.size() * 
RamUsageEstimator.NUM_BYTES_OBJECT_REF
+                + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+                + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
+        lastValueMemCost += RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
+        if (lastValueMemCost <= config.getCacheLastValuesMemoryBudgetInByte()) 
{
+          deviceLastValues
+              .computeIfAbsent(entry.getKey(), deviceID -> new ArrayList<>())
+              .addAll(seriesLastValues);
+        } else {
+          deviceLastValues = null;
+        }
       }
     }
     tsFileResource.setTimeIndex(newTimeIndex);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 76e315b7050..87db5cd25c1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -60,6 +60,7 @@ import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.TsPrimitiveType;
 import org.apache.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
@@ -78,6 +79,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -543,33 +545,60 @@ public class LoadTsFileManager {
       if 
(IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad()) {
         deviceLastValues = new HashMap<>();
       }
+      AtomicLong lastValuesMemCost = new AtomicLong(0);
+
       for (final ChunkGroupMetadata chunkGroupMetadata : 
writer.getChunkGroupMetadataList()) {
         final IDeviceID device = chunkGroupMetadata.getDevice();
         for (final ChunkMetadata chunkMetadata : 
chunkGroupMetadata.getChunkMetadataList()) {
           tsFileResource.updateStartTime(device, chunkMetadata.getStartTime());
           tsFileResource.updateEndTime(device, chunkMetadata.getEndTime());
           if (deviceLastValues != null) {
-            deviceLastValues
-                .computeIfAbsent(device, d -> new HashMap<>())
-                .compute(
-                    chunkMetadata.getMeasurementUid(),
-                    (m, oldPair) -> {
-                      if (oldPair != null && oldPair.getTimestamp() > 
chunkMetadata.getEndTime()) {
-                        return oldPair;
-                      }
-                      TsPrimitiveType lastValue =
-                          chunkMetadata.getStatistics() != null
-                                  && chunkMetadata.getDataType() != 
TSDataType.BLOB
-                              ? TsPrimitiveType.getByType(
-                                  chunkMetadata.getDataType() == 
TSDataType.VECTOR
-                                      ? TSDataType.INT64
-                                      : chunkMetadata.getDataType(),
-                                  chunkMetadata.getStatistics().getLastValue())
-                              : null;
-                      return lastValue != null
+            Map<String, TimeValuePair> deviceMap =
+                deviceLastValues.computeIfAbsent(
+                    device,
+                    d -> {
+                      Map<String, TimeValuePair> map = new HashMap<>();
+                      
lastValuesMemCost.addAndGet(RamUsageEstimator.shallowSizeOf(map));
+                      lastValuesMemCost.addAndGet(device.ramBytesUsed());
+                      return map;
+                    });
+            int prevSize = deviceMap.size();
+            deviceMap.compute(
+                chunkMetadata.getMeasurementUid(),
+                (m, oldPair) -> {
+                  if (oldPair != null && oldPair.getTimestamp() > 
chunkMetadata.getEndTime()) {
+                    return oldPair;
+                  }
+                  TsPrimitiveType lastValue =
+                      chunkMetadata.getStatistics() != null
+                              && chunkMetadata.getDataType() != TSDataType.BLOB
+                          ? TsPrimitiveType.getByType(
+                              chunkMetadata.getDataType() == TSDataType.VECTOR
+                                  ? TSDataType.INT64
+                                  : chunkMetadata.getDataType(),
+                              chunkMetadata.getStatistics().getLastValue())
+                          : null;
+                  TimeValuePair timeValuePair =
+                      lastValue != null
                           ? new TimeValuePair(chunkMetadata.getEndTime(), 
lastValue)
                           : null;
-                    });
+                  if (oldPair != null) {
+                    lastValuesMemCost.addAndGet(-oldPair.getSize());
+                  }
+                  if (timeValuePair != null) {
+                    lastValuesMemCost.addAndGet(timeValuePair.getSize());
+                  }
+                  return timeValuePair;
+                });
+            int afterSize = deviceMap.size();
+            lastValuesMemCost.addAndGet(
+                (afterSize - prevSize) * 
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
+            if (lastValuesMemCost.get()
+                > IoTDBDescriptor.getInstance()
+                    .getConfig()
+                    .getCacheLastValuesMemoryBudgetInByte()) {
+              deviceLastValues = null;
+            }
           }
         }
       }

Reply via email to