This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9fe128c7c21 Pipe: add memory control for leader cache (#11818)
9fe128c7c21 is described below
commit 9fe128c7c21e2b28b4124479c386c4335223468d
Author: Xuan Ronaldo <[email protected]>
AuthorDate: Wed Jan 3 10:46:37 2024 +0800
Pipe: add memory control for leader cache (#11818)
---
.../protocol/thrift/LeaderCacheManager.java | 80 ++++++++++++++++++++--
.../apache/iotdb/commons/conf/CommonConfig.java | 9 +++
.../iotdb/commons/conf/CommonDescriptor.java | 5 ++
.../iotdb/commons/pipe/config/PipeConfig.java | 7 ++
4 files changed, 97 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/LeaderCacheManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/LeaderCacheManager.java
index 04067b898ab..ed70db59c69 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/LeaderCacheManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/LeaderCacheManager.java
@@ -20,19 +20,91 @@
package org.apache.iotdb.db.pipe.connector.protocol.thrift;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Weigher;
+import com.google.common.util.concurrent.AtomicDouble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class LeaderCacheManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LeaderCacheManager.class);
+ private static final PipeConfig CONFIG = PipeConfig.getInstance();
+
+ private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
+
+ // leader cache built by LRU
+ private final Cache<String, TEndPoint> device2endpoint;
+ // a hashmap to reuse the created endpoint
+ private final ConcurrentHashMap<TEndPoint, TEndPoint> endPoints = new
ConcurrentHashMap<>();
- private final Map<String, TEndPoint> device2endpoint = new
ConcurrentHashMap<>();
+ public LeaderCacheManager() {
+ long initMemorySizeInBytes =
PipeResourceManager.memory().getTotalMemorySizeInBytes() / 10;
+ long maxMemorySizeInBytes =
+ (long)
+ (PipeResourceManager.memory().getTotalMemorySizeInBytes()
+ * CONFIG.getPipeLeaderCacheMemoryUsagePercentage());
+
+ // properties required by pipe memory control framework
+ PipeMemoryBlock allocatedMemoryBlock =
+ PipeResourceManager.memory()
+ .tryAllocate(initMemorySizeInBytes)
+ .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
+ .setShrinkCallback(
+ (oldMemory, newMemory) -> {
+ memoryUsageCheatFactor.set(
+ memoryUsageCheatFactor.get() * ((double) oldMemory /
newMemory));
+ LOGGER.info(
+ "LeaderCacheManager.allocatedMemoryBlock has shrunk from
{} to {}.",
+ oldMemory,
+ newMemory);
+ })
+ .setExpandMethod(
+ oldMemory -> Math.min(Math.max(oldMemory, 1) * 2,
maxMemorySizeInBytes))
+ .setExpandCallback(
+ (oldMemory, newMemory) -> {
+ memoryUsageCheatFactor.set(
+ memoryUsageCheatFactor.get() / ((double) newMemory /
oldMemory));
+ LOGGER.info(
+ "LeaderCacheManager.allocatedMemoryBlock has expanded
from {} to {}.",
+ oldMemory,
+ newMemory);
+ });
+
+ device2endpoint =
+ Caffeine.newBuilder()
+ .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
+ .weigher(
+ (Weigher<String, TEndPoint>)
+ (device, endPoint) -> {
+ final long weightInLong =
+ (long) (device.getBytes().length *
memoryUsageCheatFactor.get());
+ if (weightInLong <= 0) {
+ return Integer.MAX_VALUE;
+ }
+ final int weightInInt = (int) weightInLong;
+ return weightInInt != weightInLong ? Integer.MAX_VALUE :
weightInInt;
+ })
+ .recordStats()
+ .build();
+ }
public TEndPoint getLeaderEndPoint(String deviceId) {
- return deviceId == null ? null : device2endpoint.get(deviceId);
+ return deviceId == null ? null : device2endpoint.getIfPresent(deviceId);
}
public void updateLeaderEndPoint(String deviceId, TEndPoint endPoint) {
- device2endpoint.put(deviceId, endPoint);
+ TEndPoint endPointFromMap = endPoints.putIfAbsent(endPoint, endPoint);
+ if (endPointFromMap != null) {
+ device2endpoint.put(deviceId, endPointFromMap);
+ } else {
+ device2endpoint.put(deviceId, endPoint);
+ }
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 1f89f950c50..525a0078272 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -194,6 +194,7 @@ public class CommonConfig {
private long pipeMemoryAllocateMinSizeInBytes = 32;
private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = 2 * 1024 *
1024; // 2MB
private long pipeMemoryExpanderIntervalSeconds = 3 * 60; // 3Min
+ private float PipeLeaderCacheMemoryUsagePercentage = 0.1F;
/** Whether to use persistent schema mode. */
private String schemaEngineMode = "Memory";
@@ -821,6 +822,14 @@ public class CommonConfig {
this.pipeMemoryAllocateMinSizeInBytes = pipeMemoryAllocateMinSizeInBytes;
}
+ public float getPipeLeaderCacheMemoryUsagePercentage() {
+ return PipeLeaderCacheMemoryUsagePercentage;
+ }
+
+ public void setPipeLeaderCacheMemoryUsagePercentage(float
pipeLeaderCacheMemoryUsagePercentage) {
+ this.PipeLeaderCacheMemoryUsagePercentage =
pipeLeaderCacheMemoryUsagePercentage;
+ }
+
public String getSchemaEngineMode() {
return schemaEngineMode;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index cd9404bb4e7..d767cbbf38f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -481,6 +481,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_memory_expander_interval_seconds",
String.valueOf(config.getPipeMemoryExpanderIntervalSeconds()))));
+ config.setPipeLeaderCacheMemoryUsagePercentage(
+ Float.parseFloat(
+ properties.getProperty(
+ "pipe_leader_cache_memory_usage_percentage",
+
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
}
public void loadGlobalConfig(TGlobalConfig globalConfig) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index b16e588783f..792cd65e9f3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -127,6 +127,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
}
+ public float getPipeLeaderCacheMemoryUsagePercentage() {
+ return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
+ }
+
/////////////////////////////// Meta Consistency
///////////////////////////////
public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -274,6 +278,9 @@ public class PipeConfig {
LOGGER.info(
"PipeMemoryAllocateForTsFileSequenceReaderInBytes: {}",
getPipeMemoryAllocateForTsFileSequenceReaderInBytes());
+
+ LOGGER.info(
+ "PipeLeaderCacheMemoryUsagePercentage: {}",
getPipeLeaderCacheMemoryUsagePercentage());
}
/////////////////////////////// Singleton ///////////////////////////////