This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fe128c7c21 Pipe: add memory control for leader cache (#11818)
9fe128c7c21 is described below

commit 9fe128c7c21e2b28b4124479c386c4335223468d
Author: Xuan Ronaldo <[email protected]>
AuthorDate: Wed Jan 3 10:46:37 2024 +0800

    Pipe: add memory control for leader cache (#11818)
---
 .../protocol/thrift/LeaderCacheManager.java        | 80 ++++++++++++++++++++--
 .../apache/iotdb/commons/conf/CommonConfig.java    |  9 +++
 .../iotdb/commons/conf/CommonDescriptor.java       |  5 ++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  7 ++
 4 files changed, 97 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/LeaderCacheManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/LeaderCacheManager.java
index 04067b898ab..ed70db59c69 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/LeaderCacheManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/LeaderCacheManager.java
@@ -20,19 +20,91 @@
 package org.apache.iotdb.db.pipe.connector.protocol.thrift;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Weigher;
+import com.google.common.util.concurrent.AtomicDouble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class LeaderCacheManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LeaderCacheManager.class);
+  private static final PipeConfig CONFIG = PipeConfig.getInstance();
+
+  private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
+
+  // leader cache built by LRU
+  private final Cache<String, TEndPoint> device2endpoint;
+  // a hashmap to reuse the created endpoint
+  private final ConcurrentHashMap<TEndPoint, TEndPoint> endPoints = new 
ConcurrentHashMap<>();
 
-  private final Map<String, TEndPoint> device2endpoint = new 
ConcurrentHashMap<>();
+  public LeaderCacheManager() {
+    long initMemorySizeInBytes = 
PipeResourceManager.memory().getTotalMemorySizeInBytes() / 10;
+    long maxMemorySizeInBytes =
+        (long)
+            (PipeResourceManager.memory().getTotalMemorySizeInBytes()
+                * CONFIG.getPipeLeaderCacheMemoryUsagePercentage());
+
+    // properties required by pipe memory control framework
+    PipeMemoryBlock allocatedMemoryBlock =
+        PipeResourceManager.memory()
+            .tryAllocate(initMemorySizeInBytes)
+            .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
+            .setShrinkCallback(
+                (oldMemory, newMemory) -> {
+                  memoryUsageCheatFactor.set(
+                      memoryUsageCheatFactor.get() * ((double) oldMemory / 
newMemory));
+                  LOGGER.info(
+                      "LeaderCacheManager.allocatedMemoryBlock has shrunk from 
{} to {}.",
+                      oldMemory,
+                      newMemory);
+                })
+            .setExpandMethod(
+                oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, 
maxMemorySizeInBytes))
+            .setExpandCallback(
+                (oldMemory, newMemory) -> {
+                  memoryUsageCheatFactor.set(
+                      memoryUsageCheatFactor.get() / ((double) newMemory / 
oldMemory));
+                  LOGGER.info(
+                      "LeaderCacheManager.allocatedMemoryBlock has expanded 
from {} to {}.",
+                      oldMemory,
+                      newMemory);
+                });
+
+    device2endpoint =
+        Caffeine.newBuilder()
+            .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
+            .weigher(
+                (Weigher<String, TEndPoint>)
+                    (device, endPoint) -> {
+                      final long weightInLong =
+                          (long) (device.getBytes().length * 
memoryUsageCheatFactor.get());
+                      if (weightInLong <= 0) {
+                        return Integer.MAX_VALUE;
+                      }
+                      final int weightInInt = (int) weightInLong;
+                      return weightInInt != weightInLong ? Integer.MAX_VALUE : 
weightInInt;
+                    })
+            .recordStats()
+            .build();
+  }
 
   public TEndPoint getLeaderEndPoint(String deviceId) {
-    return deviceId == null ? null : device2endpoint.get(deviceId);
+    return deviceId == null ? null : device2endpoint.getIfPresent(deviceId);
   }
 
   public void updateLeaderEndPoint(String deviceId, TEndPoint endPoint) {
-    device2endpoint.put(deviceId, endPoint);
+    TEndPoint endPointFromMap = endPoints.putIfAbsent(endPoint, endPoint);
+    if (endPointFromMap != null) {
+      device2endpoint.put(deviceId, endPointFromMap);
+    } else {
+      device2endpoint.put(deviceId, endPoint);
+    }
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 1f89f950c50..525a0078272 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -194,6 +194,7 @@ public class CommonConfig {
   private long pipeMemoryAllocateMinSizeInBytes = 32;
   private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = 2 * 1024 * 
1024; // 2MB
   private long pipeMemoryExpanderIntervalSeconds = 3 * 60; // 3Min
+  private float PipeLeaderCacheMemoryUsagePercentage = 0.1F;
 
   /** Whether to use persistent schema mode. */
   private String schemaEngineMode = "Memory";
@@ -821,6 +822,14 @@ public class CommonConfig {
     this.pipeMemoryAllocateMinSizeInBytes = pipeMemoryAllocateMinSizeInBytes;
   }
 
+  public float getPipeLeaderCacheMemoryUsagePercentage() {
+    return PipeLeaderCacheMemoryUsagePercentage;
+  }
+
+  public void setPipeLeaderCacheMemoryUsagePercentage(float 
pipeLeaderCacheMemoryUsagePercentage) {
+    this.PipeLeaderCacheMemoryUsagePercentage = 
pipeLeaderCacheMemoryUsagePercentage;
+  }
+
   public String getSchemaEngineMode() {
     return schemaEngineMode;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index cd9404bb4e7..d767cbbf38f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -481,6 +481,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_memory_expander_interval_seconds",
                 
String.valueOf(config.getPipeMemoryExpanderIntervalSeconds()))));
+    config.setPipeLeaderCacheMemoryUsagePercentage(
+        Float.parseFloat(
+            properties.getProperty(
+                "pipe_leader_cache_memory_usage_percentage",
+                
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
   }
 
   public void loadGlobalConfig(TGlobalConfig globalConfig) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index b16e588783f..792cd65e9f3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -127,6 +127,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
   }
 
+  public float getPipeLeaderCacheMemoryUsagePercentage() {
+    return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
+  }
+
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
   public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -274,6 +278,9 @@ public class PipeConfig {
     LOGGER.info(
         "PipeMemoryAllocateForTsFileSequenceReaderInBytes: {}",
         getPipeMemoryAllocateForTsFileSequenceReaderInBytes());
+
+    LOGGER.info(
+        "PipeLeaderCacheMemoryUsagePercentage: {}", 
getPipeLeaderCacheMemoryUsagePercentage());
   }
 
   /////////////////////////////// Singleton ///////////////////////////////

Reply via email to