This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 761f4e73f54 Pipe: Fixed the problem of not being able to write 
normally due to insufficient memory (#15701)
761f4e73f54 is described below

commit 761f4e73f5450681de534d59d1344393976fb184
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Jun 11 18:55:45 2025 +0800

    Pipe: Fixed the problem of not being able to write normally due to 
insufficient memory (#15701)
    
    * Pipe: Fixed the problem of not being able to write normally due to 
insufficient memory
    
    * fix
    
    * fix
    
    * fix
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 --
 .../overview/PipeWALInsertNodeCacheMetrics.java    | 92 +++-------------------
 .../wal/checkpoint/CheckpointManager.java          |  4 +-
 .../dataregion/wal/utils/WALEntryPosition.java     |  2 +-
 .../dataregion/wal/utils/WALInsertNodeCache.java   | 43 ++++------
 .../dataregion/wal/node/WALEntryHandlerTest.java   |  2 +-
 .../wal/node/WalDeleteOutdatedNewTest.java         |  2 +-
 .../wal/utils/WALInsertNodeCacheTest.java          |  2 +-
 9 files changed, 30 insertions(+), 133 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index db77ef7ce2d..8137959a391 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -151,8 +151,6 @@ public class IoTDBConfig {
 
   private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 
10;
 
-  private long allocateMemoryPerWalCache = 512 * 1024;
-
   /** Flush proportion for system */
   private double flushProportion = 0.4;
 
@@ -2004,14 +2002,6 @@ public class IoTDBConfig {
     this.writeMemoryVariationReportProportion = 
writeMemoryVariationReportProportion;
   }
 
-  public long getAllocateMemoryPerWalCache() {
-    return allocateMemoryPerWalCache;
-  }
-
-  public void setAllocateMemoryPerWalCache(final long 
allocateMemoryForWalCache) {
-    this.allocateMemoryPerWalCache = allocateMemoryForWalCache;
-  }
-
   public boolean isEnablePartialInsert() {
     return enablePartialInsert;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 155aab7c5b5..dfd854f724c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2458,12 +2458,6 @@ public class IoTDBDescriptor {
     conf.setIotConsensusV2DeletionFileDir(
         properties.getProperty(
             "iot_consensus_v2_deletion_file_dir", 
conf.getIotConsensusV2DeletionFileDir()));
-
-    conf.setAllocateMemoryPerWalCache(
-        Long.parseLong(
-            properties.getProperty(
-                "allocate_memory_per_wal_cache",
-                Long.toString(conf.getAllocateMemoryPerWalCache()))));
   }
 
   private void loadCQProps(TrimProperties properties) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeWALInsertNodeCacheMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeWALInsertNodeCacheMetrics.java
index deabdcc2f10..2c4ee76153f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeWALInsertNodeCacheMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeWALInsertNodeCacheMetrics.java
@@ -27,117 +27,43 @@ import org.apache.iotdb.metrics.metricsets.IMetricSet;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
 
-import com.google.common.collect.ImmutableSet;
-import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-
 public class PipeWALInsertNodeCacheMetrics implements IMetricSet {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeWALInsertNodeCacheMetrics.class);
 
-  @SuppressWarnings("java:S3077")
-  private volatile AbstractMetricService metricService;
-
-  private final Map<Integer, WALInsertNodeCache> cacheMap = new 
ConcurrentHashMap<>();
-
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
   @Override
   public void bindTo(AbstractMetricService metricService) {
-    this.metricService = metricService;
-    ImmutableSet<Integer> dataRegionIds = 
ImmutableSet.copyOf(cacheMap.keySet());
-    for (Integer dataRegionId : dataRegionIds) {
-      createMetrics(dataRegionId);
-    }
-  }
-
-  private void createMetrics(Integer dataRegionId) {
-    createAutoGauge(dataRegionId);
-  }
-
-  private void createAutoGauge(Integer dataRegionId) {
     metricService.createAutoGauge(
         Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString(),
         MetricLevel.IMPORTANT,
-        cacheMap.get(dataRegionId),
-        WALInsertNodeCache::getCacheHitRate,
-        Tag.REGION.toString(),
-        String.valueOf(dataRegionId));
+        WALInsertNodeCache.getInstance(),
+        WALInsertNodeCache::getCacheHitRate);
     metricService.createAutoGauge(
         Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        cacheMap.get(dataRegionId),
-        WALInsertNodeCache::getCacheHitCount,
-        Tag.REGION.toString(),
-        String.valueOf(dataRegionId));
+        WALInsertNodeCache.getInstance(),
+        WALInsertNodeCache::getCacheHitCount);
     metricService.createAutoGauge(
         Metric.PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        cacheMap.get(dataRegionId),
+        WALInsertNodeCache.getInstance(),
         WALInsertNodeCache::getCacheRequestCount,
-        Tag.REGION.toString(),
-        String.valueOf(dataRegionId));
+        Tag.REGION.toString());
   }
 
   @Override
   public void unbindFrom(AbstractMetricService metricService) {
-    ImmutableSet<Integer> dataRegionIds = 
ImmutableSet.copyOf(cacheMap.keySet());
-    for (Integer dataRegionId : dataRegionIds) {
-      deregister(dataRegionId);
-    }
-    if (!cacheMap.isEmpty()) {
-      LOGGER.warn("Failed to unbind from wal insert node cache metrics, cache 
map not empty");
-    }
-  }
-
-  private void removeMetrics(Integer dataRegionId) {
-    removeAutoGauge(dataRegionId);
-  }
-
-  private void removeAutoGauge(Integer dataRegionId) {
     metricService.remove(
-        MetricType.AUTO_GAUGE,
-        Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString(),
-        Tag.REGION.toString(),
-        String.valueOf(dataRegionId));
+        MetricType.AUTO_GAUGE, 
Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString());
     metricService.remove(
-        MetricType.AUTO_GAUGE,
-        Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT.toString(),
-        Tag.REGION.toString(),
-        String.valueOf(dataRegionId));
+        MetricType.AUTO_GAUGE, 
Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT.toString());
     metricService.remove(
-        MetricType.AUTO_GAUGE,
-        Metric.PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT.toString(),
-        Tag.REGION.toString(),
-        String.valueOf(dataRegionId));
-  }
-
-  //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
-
-  public void register(@NonNull WALInsertNodeCache walInsertNodeCache, Integer 
dataRegionId) {
-    cacheMap.putIfAbsent(dataRegionId, walInsertNodeCache);
-    if (Objects.nonNull(metricService)) {
-      createMetrics(dataRegionId);
-    }
-  }
-
-  public void deregister(Integer dataRegionId) {
-    // TODO: waiting called by WALInsertNodeCache
-    if (!cacheMap.containsKey(dataRegionId)) {
-      LOGGER.warn(
-          "Failed to deregister wal insert node cache metrics, 
WALInsertNodeCache({}) does not exist",
-          dataRegionId);
-      return;
-    }
-    if (Objects.nonNull(metricService)) {
-      removeMetrics(dataRegionId);
-    }
-    cacheMap.remove(dataRegionId);
+        MetricType.AUTO_GAUGE, 
Metric.PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT.toString());
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
index 1c859f1eae4..afa6651dfa4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
@@ -279,7 +279,7 @@ public class CheckpointManager implements AutoCloseable {
       }
       MemTableInfo memTableInfo = memTableId2Info.get(memTableId);
       if (!memTableInfo.isPinned()) {
-        
WALInsertNodeCache.getInstance(memTableInfo.getDataRegionId()).addMemTable(memTableId);
+        WALInsertNodeCache.getInstance().addMemTable(memTableId);
       }
       memTableInfo.pin();
     } finally {
@@ -309,7 +309,7 @@ public class CheckpointManager implements AutoCloseable {
       MemTableInfo memTableInfo = memTableId2Info.get(memTableId);
       memTableInfo.unpin();
       if (!memTableInfo.isPinned()) {
-        
WALInsertNodeCache.getInstance(memTableInfo.getDataRegionId()).removeMemTable(memTableId);
+        WALInsertNodeCache.getInstance().removeMemTable(memTableId);
         if (memTableInfo.isFlushed()) {
           memTableId2Info.remove(memTableId);
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index a5622d29c49..06ced2c32fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -190,7 +190,7 @@ public class WALEntryPosition {
   public void setWalNode(WALNode walNode, long memTableId) {
     this.walNode = walNode;
     identifier = walNode.getIdentifier();
-    cache = WALInsertNodeCache.getInstance(walNode.getRegionId(memTableId));
+    cache = WALInsertNodeCache.getInstance();
   }
 
   public String getIdentifier() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 46c3d962754..431e8d0ba51 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.DataNodeMemoryConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
@@ -39,7 +38,6 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.github.benmanes.caffeine.cache.Weigher;
-import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.tsfile.utils.Pair;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -65,10 +63,6 @@ public class WALInsertNodeCache {
 
   private static PipeModelFixedMemoryBlock walModelFixedMemory = null;
 
-  private final PipeModelFixedMemoryBlock memoryBlock;
-
-  // Used to adjust the memory usage of the cache
-  private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
   // LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
   private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>> 
lruCache;
 
@@ -77,16 +71,15 @@ public class WALInsertNodeCache {
 
   private volatile boolean hasPipeRunning = false;
 
-  private WALInsertNodeCache(final Integer dataRegionId) {
+  private WALInsertNodeCache() {
     if (walModelFixedMemory == null) {
       init();
     }
 
-    final long requestedAllocateSize = CONFIG.getAllocateMemoryPerWalCache();
-
-    memoryBlock =
-        PipeDataNodeResourceManager.memory()
-            .forceAllocateForModelFixedMemoryBlock(requestedAllocateSize, 
PipeMemoryBlockType.WAL);
+    final long requestedAllocateSize =
+        (long)
+            
(PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
+                * PIPE_CONFIG.getPipeDataStructureWalMemoryProportion());
 
     lruCache =
         Caffeine.newBuilder()
@@ -96,12 +89,9 @@ public class WALInsertNodeCache {
                     (position, pair) -> {
                       long weightInLong = 0L;
                       if (pair.right != null) {
-                        weightInLong =
-                            (long)
-                                (InsertNodeMemoryEstimator.sizeOf(pair.right)
-                                    * memoryUsageCheatFactor.get());
+                        weightInLong = 
InsertNodeMemoryEstimator.sizeOf(pair.right);
                       } else {
-                        weightInLong = (long) (position.getSize() * 
memoryUsageCheatFactor.get());
+                        weightInLong = position.getSize();
                       }
                       if (weightInLong <= 0) {
                         return Integer.MAX_VALUE;
@@ -111,8 +101,6 @@ public class WALInsertNodeCache {
                     })
             .recordStats()
             .build(new WALInsertNodeCacheLoader());
-
-    PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
   }
 
   // please call this method at PipeLauncher
@@ -124,7 +112,11 @@ public class WALInsertNodeCache {
       // Allocate memory for the fixed memory block of WAL
       walModelFixedMemory =
           PipeDataNodeResourceManager.memory()
-              .forceAllocateForModelFixedMemoryBlock(0L, 
PipeMemoryBlockType.WAL);
+              .forceAllocateForModelFixedMemoryBlock(
+                  (long)
+                      
(PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
+                          * 
PIPE_CONFIG.getPipeDataStructureWalMemoryProportion()),
+                  PipeMemoryBlockType.WAL);
     } catch (Exception e) {
       LOGGER.error("Failed to initialize WAL model fixed memory block", e);
       walModelFixedMemory =
@@ -318,17 +310,13 @@ public class WALInsertNodeCache {
 
   /////////////////////////// Singleton ///////////////////////////
 
-  public static WALInsertNodeCache getInstance(final Integer regionId) {
-    return InstanceHolder.getOrCreateInstance(regionId);
+  public static WALInsertNodeCache getInstance() {
+    return InstanceHolder.INSTANCE;
   }
 
   private static class InstanceHolder {
 
-    private static final Map<Integer, WALInsertNodeCache> INSTANCE_MAP = new 
ConcurrentHashMap<>();
-
-    public static WALInsertNodeCache getOrCreateInstance(final Integer key) {
-      return INSTANCE_MAP.computeIfAbsent(key, k -> new 
WALInsertNodeCache(key));
-    }
+    public static final WALInsertNodeCache INSTANCE = new WALInsertNodeCache();
 
     private InstanceHolder() {
       // forbidding instantiation
@@ -345,7 +333,6 @@ public class WALInsertNodeCache {
   @TestOnly
   public void clear() {
     lruCache.invalidateAll();
-    memoryBlock.close();
     memTablesNeedSearch.clear();
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
index 0496d58d7dc..d5913e54355 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
@@ -94,7 +94,7 @@ public class WALEntryHandlerTest {
     config.setWalMode(prevMode);
     EnvironmentUtils.cleanDir(logDirectory1);
     EnvironmentUtils.cleanDir(logDirectory2);
-    WALInsertNodeCache.getInstance(1).clear();
+    WALInsertNodeCache.getInstance().clear();
   }
 
   @Test(expected = MemTablePinException.class)
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
index 7109a14b030..593d25b6932 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
@@ -88,7 +88,7 @@ public class WalDeleteOutdatedNewTest {
     config.setDataRegionConsensusProtocolClass(prevConsensus);
     EnvironmentUtils.cleanDir(logDirectory1);
     StorageEngine.getInstance().reset();
-    WALInsertNodeCache.getInstance(1).clear();
+    WALInsertNodeCache.getInstance().clear();
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
index d1e20b4b2ef..552c8334f95 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
@@ -54,7 +54,7 @@ public class WALInsertNodeCacheTest {
   private static final String databasePath = "root.test_sg";
   private static final String devicePath = databasePath + ".test_d";
   private static final String dataRegionId = "1";
-  private static final WALInsertNodeCache cache = 
WALInsertNodeCache.getInstance(1);
+  private static final WALInsertNodeCache cache = 
WALInsertNodeCache.getInstance();
   private WALMode prevMode;
   private WALNode walNode;
 

Reply via email to