This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch Wal_mem_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4fc85269c98292db01c00083aaf49b2691b9ee86
Author: HTHou <[email protected]>
AuthorDate: Wed Dec 4 18:24:12 2024 +0800

    init
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 13 +++-
 .../storageengine/buffer/CacheHitRatioMonitor.java |  4 +-
 .../dataregion/wal/buffer/WALBuffer.java           |  5 +-
 .../dataregion/wal/buffer/WALInfoEntry.java        | 25 +++++++
 .../storageengine/dataregion/wal/node/WALNode.java |  4 +-
 .../dataregion/wal/utils/WALEntryQueue.java        | 82 ++++++++++++++++++++++
 .../db/storageengine/rescon/memory/SystemInfo.java | 28 ++++++--
 .../buffer/CacheHitRatioMonitorTest.java           |  4 +-
 9 files changed, 161 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 942fe993c56..1f9a8cd4ae2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -178,6 +178,9 @@ public class IoTDBConfig {
   /** The proportion of write memory for compaction */
   private double compactionProportion = 0.2;
 
+  /** The proportion of memtable memory for WAL queue */
+  private double walQueueProportion = 0.1;
+
   /** The proportion of memtable memory for device path cache */
   private double devicePathCacheProportion = 0.05;
 
@@ -3571,6 +3574,14 @@ public class IoTDBConfig {
     return compactionProportion;
   }
 
+  public double getWalQueueProportion() {
+    return walQueueProportion;
+  }
+
+  public void setWalQueueProportion(double walQueueProportion) {
+    this.walQueueProportion = walQueueProportion;
+  }
+
   public double getDevicePathCacheProportion() {
     return devicePathCacheProportion;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5411556ca64..8b3f2b82e04 100755
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -330,6 +330,14 @@ public class IoTDBDescriptor {
                 .map(String::trim)
                 .orElse(Double.toString(conf.getRejectProportion())));
 
+    final double walQueueProportion =
+        Double.parseDouble(
+            Optional.ofNullable(
+                    properties.getProperty(
+                        "wal_queue_proportion", 
Double.toString(conf.getWalQueueProportion())))
+                .map(String::trim)
+                .orElse(Double.toString(conf.getWalQueueProportion())));
+
     final double devicePathCacheProportion =
         Double.parseDouble(
             Optional.ofNullable(
@@ -339,11 +347,12 @@ public class IoTDBDescriptor {
                 .map(String::trim)
                 .orElse(Double.toString(conf.getDevicePathCacheProportion())));
 
-    if (rejectProportion + devicePathCacheProportion >= 1) {
+    if (rejectProportion + walQueueProportion + devicePathCacheProportion >= 
1) {
       LOGGER.warn(
-          "The sum of write_memory_proportion and device_path_cache_proportion 
is too large, use default values 0.8 and 0.05.");
+          "The sum of reject_proportion, wal_queue_proportion and 
device_path_cache_proportion is too large, use default values 0.8, 0.1 and 
0.05.");
     } else {
       conf.setRejectProportion(rejectProportion);
+      conf.setWalQueueProportion(walQueueProportion);
       conf.setDevicePathCacheProportion(devicePathCacheProportion);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitor.java
index 0d610c5af2f..503a3d35cbd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitor.java
@@ -151,12 +151,12 @@ public class CacheHitRatioMonitor implements 
CacheHitRatioMonitorMXBean, IServic
 
   @Override
   public double getFlushThershold() {
-    return SystemInfo.getInstance().getFlushThershold();
+    return SystemInfo.getInstance().getFlushThreshold();
   }
 
   @Override
   public double getRejectThershold() {
-    return SystemInfo.getInstance().getRejectThershold();
+    return SystemInfo.getInstance().getRejectThreshold();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
index 36ba97dadf0..cb0b732fcb5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.CheckpointMan
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.BrokenWALFileException;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALNodeClosedException;
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryQueue;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
@@ -57,8 +58,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -85,7 +84,7 @@ public class WALBuffer extends AbstractWALBuffer {
   // manage checkpoints
   private final CheckpointManager checkpointManager;
   // WALEntries
-  private final BlockingQueue<WALEntry> walEntries = new 
ArrayBlockingQueue<>(QUEUE_CAPACITY);
+  private final WALEntryQueue walEntries = new WALEntryQueue();
   // lock to provide synchronization for double buffers mechanism, protecting 
buffers status
   private final Lock buffersLock = new ReentrantLock();
   // condition to guarantee correctness of switching buffers
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
index e065dbc2b58..d89e16061cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
@@ -21,7 +21,10 @@ package 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer;
 
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
 
 import java.util.ArrayList;
@@ -137,6 +140,28 @@ public class WALInfoEntry extends WALEntry {
     return false;
   }
 
+  public long getMemorySize() {
+    switch (type) {
+      case INSERT_TABLET_NODE:
+        ((InsertTabletNode) value).serializeToWAL(buffer, 
tabletInfo.tabletRangeList);
+        break;
+      case INSERT_ROW_NODE:
+      case INSERT_ROWS_NODE:
+        return InsertNodeMemoryEstimator.sizeOf((InsertNode) value);
+      case DELETE_DATA_NODE:
+      case RELATIONAL_DELETE_DATA_NODE:
+      case MEMORY_TABLE_SNAPSHOT:
+        return ((IMemTable) value).getTVListsRamCost();
+      case CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE:
+        value.serializeToWAL(buffer);
+        break;
+      case MEMORY_TABLE_CHECKPOINT:
+        throw new RuntimeException("Cannot serialize checkpoint to wal 
files.");
+      default:
+        throw new RuntimeException("Unsupported wal entry type " + type);
+    }
+  }
+
   @Override
   public int hashCode() {
     return Objects.hash(super.hashCode(), tabletInfo);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 73a21772f0c..7028872fcad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -410,14 +410,14 @@ public class WALNode implements IWALNode {
                               .getName()),
                       StringUtils.join(successfullyDeleted, ","),
                       fileIndexAfterFilterSafelyDeleteIndex,
-                      System.getProperty("line.separator")));
+                      System.lineSeparator()));
 
           if (!pinnedMemTableIds.isEmpty()) {
             summary
                 .append("- MemTable has been flushed but pinned by PIPE, the 
MemTableId list is : ")
                 .append(StringUtils.join(pinnedMemTableIds, ","))
                 .append(".")
-                .append(System.getProperty("line.separator"));
+                .append(System.lineSeparator());
           }
           if (fileIndexAfterFilterSafelyDeleteIndex < 
sortedWalFilesExcludingLast.length) {
             summary.append(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryQueue.java
new file mode 100644
index 00000000000..28aa3c8e514
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryQueue.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
+
+import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALInfoEntry;
+import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class WALEntryQueue {
+
+  private final BlockingQueue<WALEntry> queue;
+
+  public WALEntryQueue() {
+    queue = new LinkedBlockingQueue<>();
+  }
+
+  public WALEntry poll(long timeout, TimeUnit unit) throws 
InterruptedException {
+    WALEntry e = queue.poll(timeout, unit);
+    if (e != null) {
+      SystemInfo.getInstance().updateWalQueueMemoryCost(-getElementSize(e));
+    }
+    return e;
+  }
+
+  public void put(WALEntry e) throws InterruptedException {
+    long elementSize = getElementSize(e);
+    while 
(SystemInfo.getInstance().cannotReserveMemoryForWalEntry(elementSize)) {
+      wait();
+    }
+    queue.put(e);
+    SystemInfo.getInstance().updateWalQueueMemoryCost(elementSize);
+  }
+
+  public WALEntry take() throws InterruptedException {
+    WALEntry e = queue.take();
+    SystemInfo.getInstance().updateWalQueueMemoryCost(-getElementSize(e));
+    return e;
+  }
+
+  public int size() {
+    return queue.size();
+  }
+
+  public boolean isEmpty() {
+    return queue.isEmpty();
+  }
+
+  private long getElementSize(WALEntry walEntry) {
+    if (walEntry.isSignal()) {
+      return walEntry.getValue().serializedSize();
+    } else {
+      return ((WALInfoEntry) walEntry).getValue()
+    }
+    if (walEntry.getValue() instanceof InsertNode) {
+      return InsertNodeMemoryEstimator.sizeOf((InsertNode) 
walEntry.getValue());
+    }
+    return walEntry.getValue().serializedSize();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
index 0458320af05..a784ce7beb6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
@@ -54,8 +54,9 @@ public class SystemInfo {
 
   private long memorySizeForMemtable;
   private long memorySizeForCompaction;
+  private long memorySizeForWalQueue;
   private long totalDirectBufferMemorySizeLimit;
-  private Map<DataRegionInfo, Long> reportedStorageGroupMemCostMap = new 
HashMap<>();
+  private final Map<DataRegionInfo, Long> reportedStorageGroupMemCostMap = new 
HashMap<>();
 
   private long flushingMemTablesCost = 0L;
   private final AtomicLong directBufferMemoryCost = new AtomicLong(0);
@@ -76,6 +77,8 @@ public class SystemInfo {
 
   private volatile boolean isEncodingFasterThanIo = true;
 
+  private final AtomicLong walEntryQueueMemoryCost = new AtomicLong(0);
+
   private SystemInfo() {
     allocateWriteMemory();
   }
@@ -113,7 +116,7 @@ public class SystemInfo {
       return true;
     } else {
       logger.info(
-          "Change system to reject status. Triggered by: logical SG ({}), mem 
cost delta ({}), totalSgMemCost ({}), REJECT_THERSHOLD ({})",
+          "Change system to reject status. Triggered by: logical SG ({}), mem 
cost delta ({}), totalSgMemCost ({}), REJECT_THRESHOLD ({})",
           dataRegionInfo.getDataRegion().getDatabaseName(),
           delta,
           totalStorageGroupMemCost,
@@ -403,6 +406,11 @@ public class SystemInfo {
             (config.getAllocateMemoryForStorageEngine() * 
config.getWriteProportionForMemtable());
     memorySizeForCompaction =
         (long) (config.getAllocateMemoryForStorageEngine() * 
config.getCompactionProportion());
+    memorySizeForWalQueue =
+        (long)
+            (config.getAllocateMemoryForStorageEngine()
+                * config.getWriteProportionForMemtable()
+                * config.getWalQueueProportion());
     FLUSH_THRESHOLD = memorySizeForMemtable * config.getFlushProportion();
     REJECT_THRESHOLD = memorySizeForMemtable * config.getRejectProportion();
     WritingMetrics.getInstance().recordFlushThreshold(FLUSH_THRESHOLD);
@@ -537,11 +545,23 @@ public class SystemInfo {
     return totalStorageGroupMemCost;
   }
 
-  public double getFlushThershold() {
+  public double getFlushThreshold() {
     return FLUSH_THRESHOLD;
   }
 
-  public double getRejectThershold() {
+  public double getRejectThreshold() {
     return REJECT_THRESHOLD;
   }
+
+  public long getCurrentWalQueueMemoryCost() {
+    return walEntryQueueMemoryCost.get();
+  }
+
+  public void updateWalQueueMemoryCost(long delta) {
+    walEntryQueueMemoryCost.addAndGet(delta);
+  }
+
+  public boolean cannotReserveMemoryForWalEntry(long walEntrySize) {
+    return walEntryQueueMemoryCost.get() + walEntrySize > 
memorySizeForWalQueue;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitorTest.java
index 8c5ea0246e6..b9152ec9132 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitorTest.java
@@ -77,11 +77,11 @@ public class CacheHitRatioMonitorTest {
           SystemInfo.getInstance().getTotalMemTableSize(),
           cacheHitRatioMonitor.getTotalMemTableSize());
       assertEquals(
-          SystemInfo.getInstance().getFlushThershold(),
+          SystemInfo.getInstance().getFlushThreshold(),
           cacheHitRatioMonitor.getFlushThershold(),
           delta);
       assertEquals(
-          SystemInfo.getInstance().getRejectThershold(),
+          SystemInfo.getInstance().getRejectThreshold(),
           cacheHitRatioMonitor.getRejectThershold(),
           delta);
       assertEquals(

Reply via email to