This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push:
new 1c19dc9b652 try to fix OOM
1c19dc9b652 is described below
commit 1c19dc9b6524fc1adf054af0ae039c5bd6a6b5bb
Author: HTHou <[email protected]>
AuthorDate: Thu Jul 10 12:34:30 2025 +0800
try to fix OOM
---
.../plan/planner/plan/node/write/ObjectNode.java | 2 +-
.../storageengine/dataregion/wal/node/WALNode.java | 24 ++++++++++++++++++++--
2 files changed, 23 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
index 27d06f55819..ebe839bbc0c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
@@ -327,7 +327,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
@Override
public long getMemorySize() {
- return content.length;
+ return contentLength;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index ae5bb354bec..da972db78f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -739,6 +739,8 @@ public class WALNode implements IWALNode {
AtomicBoolean notFirstFile = new AtomicBoolean(false);
AtomicBoolean hasCollectedSufficientData = new AtomicBoolean(false);
+ long memorySize = 0;
+
// try to collect current tmpNodes to insertNodes, return true if
successfully collect an
// insert node
Runnable tryToCollectInsertNodeAndBumpIndex =
@@ -789,8 +791,10 @@ public class WALNode implements IWALNode {
tmpNodes
.get()
.add(new IoTConsensusRequest(((ObjectNode)
walEntry.getValue()).serialize()));
+ memorySize += ((ObjectNode)
walEntry.getValue()).getMemorySize();
} else {
tmpNodes.get().add(new IoTConsensusRequest(buffer));
+ memorySize += buffer.remaining();
}
} else {
// currentWalEntryIndex > targetIndex
@@ -803,12 +807,28 @@ public class WALNode implements IWALNode {
currentWalEntryIndex);
nextSearchIndex = currentWalEntryIndex;
}
- tmpNodes.get().add(new IoTConsensusRequest(buffer));
+ if (type == WALEntryType.OBJECT_FILE_NODE) {
+ WALEntry walEntry =
+ WALEntry.deserialize(
+ new DataInputStream(new
ByteArrayInputStream(buffer.array())));
+ // only be called by leader read from wal
+ // wal only has relativePath, offset, eof, length
+ // need to add WALEntryType + memtableId + relativePath,
offset, eof, length +
+ // content
+ // need to add IoTConsensusRequest instead of ObjectNode
+ tmpNodes
+ .get()
+ .add(new IoTConsensusRequest(((ObjectNode)
walEntry.getValue()).serialize()));
+ memorySize += ((ObjectNode)
walEntry.getValue()).getMemorySize();
+ } else {
+ tmpNodes.get().add(new IoTConsensusRequest(buffer));
+ memorySize += buffer.remaining();
+ }
}
} else {
tryToCollectInsertNodeAndBumpIndex.run();
}
- if (hasCollectedSufficientData.get()) {
+ if (memorySize > config.getWalBufferSize() ||
hasCollectedSufficientData.get()) {
break COLLECT_FILE_LOOP;
}
}