This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch Wal_mem_control in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0eb13b7d2c87d1d6ec53a2178a493e8fc8d2ea39 Author: HTHou <[email protected]> AuthorDate: Thu Dec 5 09:41:08 2024 +0800 dev more --- .../plan/planner/plan/node/write/InsertNode.java | 10 ++++++++ .../dataregion/wal/buffer/WALEntry.java | 2 ++ .../dataregion/wal/buffer/WALInfoEntry.java | 27 ++++++++++++++-------- .../dataregion/wal/buffer/WALSignalEntry.java | 5 ++++ .../dataregion/wal/utils/WALEntryQueue.java | 13 +---------- 5 files changed, 36 insertions(+), 21 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java index b13250a3784..9161d7ca26a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -85,6 +86,8 @@ public abstract class InsertNode extends SearchNode { protected ProgressIndex progressIndex; + protected long memorySize; + private static final DeviceIDFactory deviceIDFactory = DeviceIDFactory.getInstance(); protected InsertNode(PlanNodeId id) { @@ -418,4 +421,11 @@ public abstract class InsertNode extends SearchNode { return DataNodeDevicePathCache.getInstance() .getPartialPath(ReadWriteIOUtils.readString(stream)); } + + public long getMemorySize() { + if (memorySize == 0) { + memorySize = InsertNodeMemoryEstimator.sizeOf(this); + } + return memorySize; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java index 82d44213483..bb5969dde9d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java @@ -196,4 +196,6 @@ public abstract class WALEntry implements SerializedSize { } public abstract boolean isSignal(); + + public abstract long getMemorySize(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java index d89e16061cb..6da50edba4f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java @@ -21,12 +21,13 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.buffer; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; +import org.apache.tsfile.utils.RamUsageEstimator; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -105,6 +106,14 @@ public class WALInfoEntry extends WALEntry { this.tabletRangeList = new ArrayList<>(tabletRangeList); } + public int getRangeRowCount() { + int count = 0; + for (int[] range : tabletRangeList) { + count += range[1] - range[0]; + } + return count; + } + @Override public int hashCode() { return Objects.hash(tabletRangeList); @@ -140,23 +149,23 @@ public class WALInfoEntry extends WALEntry { return false; } + @Override public long getMemorySize() { switch (type) { case INSERT_TABLET_NODE: - ((InsertTabletNode) value).serializeToWAL(buffer, tabletInfo.tabletRangeList); - break; + return ((InsertNode) value).getMemorySize() + / ((InsertTabletNode) value).getRowCount() + * tabletInfo.getRangeRowCount(); case INSERT_ROW_NODE: case INSERT_ROWS_NODE: - return InsertNodeMemoryEstimator.sizeOf((InsertNode) value); - case DELETE_DATA_NODE: - case RELATIONAL_DELETE_DATA_NODE: + return ((InsertNode) value).getMemorySize(); case MEMORY_TABLE_SNAPSHOT: return ((IMemTable) value).getTVListsRamCost(); + case DELETE_DATA_NODE: + case RELATIONAL_DELETE_DATA_NODE: case CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE: - value.serializeToWAL(buffer); - break; case MEMORY_TABLE_CHECKPOINT: - throw new RuntimeException("Cannot serialize checkpoint to wal files."); + return RamUsageEstimator.sizeOfObject(value); default: throw new RuntimeException("Unsupported wal entry type " + type); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALSignalEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALSignalEntry.java index 0839659dd45..86064d3bf2f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALSignalEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALSignalEntry.java @@ -57,4 +57,9 @@ public class WALSignalEntry extends WALEntry { public boolean isSignal() { return true; } + + @Override + public long getMemorySize() { + return Byte.BYTES; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryQueue.java index 28aa3c8e514..a2bec68ccf6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryQueue.java @@ -19,10 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.utils; -import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; -import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALInfoEntry; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import java.util.concurrent.BlockingQueue; @@ -69,14 +66,6 @@ public class WALEntryQueue { } private long getElementSize(WALEntry walEntry) { - if (walEntry.isSignal()) { - return walEntry.getValue().serializedSize(); - } else { - return ((WALInfoEntry) walEntry).getValue() - } - if (walEntry.getValue() instanceof InsertNode) { - return InsertNodeMemoryEstimator.sizeOf((InsertNode) walEntry.getValue()); - } - return walEntry.getValue().serializedSize(); + return walEntry.getMemorySize(); } }
