This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/object_type by this push:
     new 1c19dc9b652 try to fix OOM
1c19dc9b652 is described below

commit 1c19dc9b6524fc1adf054af0ae039c5bd6a6b5bb
Author: HTHou <[email protected]>
AuthorDate: Thu Jul 10 12:34:30 2025 +0800

    try to fix OOM
---
 .../plan/planner/plan/node/write/ObjectNode.java   |  2 +-
 .../storageengine/dataregion/wal/node/WALNode.java | 24 ++++++++++++++++++++--
 2 files changed, 23 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
index 27d06f55819..ebe839bbc0c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
@@ -327,7 +327,7 @@ public class ObjectNode extends SearchNode implements 
WALEntryValue {
 
   @Override
   public long getMemorySize() {
-    return content.length;
+    return contentLength;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index ae5bb354bec..da972db78f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -739,6 +739,8 @@ public class WALNode implements IWALNode {
       AtomicBoolean notFirstFile = new AtomicBoolean(false);
       AtomicBoolean hasCollectedSufficientData = new AtomicBoolean(false);
 
+      long memorySize = 0;
+
       // try to collect current tmpNodes to insertNodes, return true if 
successfully collect an
       // insert node
       Runnable tryToCollectInsertNodeAndBumpIndex =
@@ -789,8 +791,10 @@ public class WALNode implements IWALNode {
                   tmpNodes
                       .get()
                       .add(new IoTConsensusRequest(((ObjectNode) 
walEntry.getValue()).serialize()));
+                  memorySize += ((ObjectNode) 
walEntry.getValue()).getMemorySize();
                 } else {
                   tmpNodes.get().add(new IoTConsensusRequest(buffer));
+                  memorySize += buffer.remaining();
                 }
               } else {
                 // currentWalEntryIndex > targetIndex
@@ -803,12 +807,28 @@ public class WALNode implements IWALNode {
                       currentWalEntryIndex);
                   nextSearchIndex = currentWalEntryIndex;
                 }
-                tmpNodes.get().add(new IoTConsensusRequest(buffer));
+                if (type == WALEntryType.OBJECT_FILE_NODE) {
+                  WALEntry walEntry =
+                      WALEntry.deserialize(
+                          new DataInputStream(new 
ByteArrayInputStream(buffer.array())));
+                  // only be called by leader read from wal
+                  // wal only has relativePath, offset, eof, length
+                  // need to add WALEntryType + memtableId + relativePath, 
offset, eof, length +
+                  // content
+                  // need to add IoTConsensusRequest instead of ObjectNode
+                  tmpNodes
+                      .get()
+                      .add(new IoTConsensusRequest(((ObjectNode) 
walEntry.getValue()).serialize()));
+                  memorySize += ((ObjectNode) 
walEntry.getValue()).getMemorySize();
+                } else {
+                  tmpNodes.get().add(new IoTConsensusRequest(buffer));
+                  memorySize += buffer.remaining();
+                }
               }
             } else {
               tryToCollectInsertNodeAndBumpIndex.run();
             }
-            if (hasCollectedSufficientData.get()) {
+            if (memorySize > config.getWalBufferSize() || 
hasCollectedSufficientData.get()) {
               break COLLECT_FILE_LOOP;
             }
           }

Reply via email to