This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch Wal_mem_control in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4fc85269c98292db01c00083aaf49b2691b9ee86 Author: HTHou <[email protected]> AuthorDate: Wed Dec 4 18:24:12 2024 +0800 init --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 13 +++- .../storageengine/buffer/CacheHitRatioMonitor.java | 4 +- .../dataregion/wal/buffer/WALBuffer.java | 5 +- .../dataregion/wal/buffer/WALInfoEntry.java | 25 +++++++ .../storageengine/dataregion/wal/node/WALNode.java | 4 +- .../dataregion/wal/utils/WALEntryQueue.java | 82 ++++++++++++++++++++++ .../db/storageengine/rescon/memory/SystemInfo.java | 28 ++++++-- .../buffer/CacheHitRatioMonitorTest.java | 4 +- 9 files changed, 161 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 942fe993c56..1f9a8cd4ae2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -178,6 +178,9 @@ public class IoTDBConfig { /** The proportion of write memory for compaction */ private double compactionProportion = 0.2; + /** The proportion of memtable memory for WAL queue */ + private double walQueueProportion = 0.1; + /** The proportion of memtable memory for device path cache */ private double devicePathCacheProportion = 0.05; @@ -3571,6 +3574,14 @@ public class IoTDBConfig { return compactionProportion; } + public double getWalQueueProportion() { + return walQueueProportion; + } + + public void setWalQueueProportion(double walQueueProportion) { + this.walQueueProportion = walQueueProportion; + } + public double getDevicePathCacheProportion() { return devicePathCacheProportion; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 5411556ca64..8b3f2b82e04 100755 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -330,6 +330,14 @@ public class IoTDBDescriptor { .map(String::trim) .orElse(Double.toString(conf.getRejectProportion()))); + final double walQueueProportion = + Double.parseDouble( + Optional.ofNullable( + properties.getProperty( + "wal_queue_proportion", Double.toString(conf.getWalQueueProportion()))) + .map(String::trim) + .orElse(Double.toString(conf.getWalQueueProportion()))); + final double devicePathCacheProportion = Double.parseDouble( Optional.ofNullable( @@ -339,11 +347,12 @@ public class IoTDBDescriptor { .map(String::trim) .orElse(Double.toString(conf.getDevicePathCacheProportion()))); - if (rejectProportion + devicePathCacheProportion >= 1) { + if (rejectProportion + walQueueProportion + devicePathCacheProportion >= 1) { LOGGER.warn( - "The sum of write_memory_proportion and device_path_cache_proportion is too large, use default values 0.8 and 0.05."); + "The sum of reject_proportion, wal_queue_proportion and device_path_cache_proportion is too large, use default values 0.8, 0.1 and 0.05."); } else { conf.setRejectProportion(rejectProportion); + conf.setWalQueueProportion(walQueueProportion); conf.setDevicePathCacheProportion(devicePathCacheProportion); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitor.java index 0d610c5af2f..503a3d35cbd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitor.java @@ -151,12 +151,12 @@ public class CacheHitRatioMonitor implements CacheHitRatioMonitorMXBean, IServic @Override public double getFlushThershold() { - return SystemInfo.getInstance().getFlushThershold(); + return SystemInfo.getInstance().getFlushThreshold(); } @Override public double getRejectThershold() { - return SystemInfo.getInstance().getRejectThershold(); + return SystemInfo.getInstance().getRejectThreshold(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java index 36ba97dadf0..cb0b732fcb5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.CheckpointMan import org.apache.iotdb.db.storageengine.dataregion.wal.exception.BrokenWALFileException; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALNodeClosedException; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData; +import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryQueue; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; @@ -57,8 +58,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -85,7 +84,7 @@ public class WALBuffer extends AbstractWALBuffer { // manage checkpoints private final CheckpointManager checkpointManager; // WALEntries - private final BlockingQueue<WALEntry> walEntries = new ArrayBlockingQueue<>(QUEUE_CAPACITY); + private final WALEntryQueue walEntries = new WALEntryQueue(); // lock to provide synchronization for double buffers mechanism, protecting buffers status private final Lock buffersLock = new ReentrantLock(); // condition to guarantee correctness of switching buffers diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java index e065dbc2b58..d89e16061cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java @@ -21,7 +21,10 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.buffer; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; import java.util.ArrayList; @@ -137,6 +140,28 @@ public class WALInfoEntry extends WALEntry { return false; } + public long getMemorySize() { + switch (type) { + case INSERT_TABLET_NODE: + ((InsertTabletNode) value).serializeToWAL(buffer, tabletInfo.tabletRangeList); + break; + case INSERT_ROW_NODE: + case INSERT_ROWS_NODE: + return InsertNodeMemoryEstimator.sizeOf((InsertNode) value); + case DELETE_DATA_NODE: + case RELATIONAL_DELETE_DATA_NODE: + case MEMORY_TABLE_SNAPSHOT: + return ((IMemTable) value).getTVListsRamCost(); + case CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE: + value.serializeToWAL(buffer); + break; + case MEMORY_TABLE_CHECKPOINT: + throw new RuntimeException("Cannot serialize checkpoint to wal files."); + default: + throw new RuntimeException("Unsupported wal entry type " + type); + } + } + @Override public int hashCode() { return Objects.hash(super.hashCode(), tabletInfo); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java index 73a21772f0c..7028872fcad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java @@ -410,14 +410,14 @@ public class WALNode implements IWALNode { .getName()), StringUtils.join(successfullyDeleted, ","), fileIndexAfterFilterSafelyDeleteIndex, - System.getProperty("line.separator"))); + System.lineSeparator())); if (!pinnedMemTableIds.isEmpty()) { summary .append("- MemTable has been flushed but pinned by PIPE, the MemTableId list is : ") .append(StringUtils.join(pinnedMemTableIds, ",")) .append(".") - .append(System.getProperty("line.separator")); + .append(System.lineSeparator()); } if (fileIndexAfterFilterSafelyDeleteIndex < sortedWalFilesExcludingLast.length) { summary.append( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryQueue.java new file mode 100644 index 00000000000..28aa3c8e514 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryQueue.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.wal.utils; + +import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALInfoEntry; +import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class WALEntryQueue { + + private final BlockingQueue<WALEntry> queue; + + public WALEntryQueue() { + queue = new LinkedBlockingQueue<>(); + } + + public WALEntry poll(long timeout, TimeUnit unit) throws InterruptedException { + WALEntry e = queue.poll(timeout, unit); + if (e != null) { + SystemInfo.getInstance().updateWalQueueMemoryCost(-getElementSize(e)); + } + return e; + } + + public void put(WALEntry e) throws InterruptedException { + long elementSize = getElementSize(e); + while (SystemInfo.getInstance().cannotReserveMemoryForWalEntry(elementSize)) { + wait(); + } + queue.put(e); + SystemInfo.getInstance().updateWalQueueMemoryCost(elementSize); + } + + public WALEntry take() throws InterruptedException { + WALEntry e = queue.take(); + SystemInfo.getInstance().updateWalQueueMemoryCost(-getElementSize(e)); + return e; + } + + public int size() { + return queue.size(); + } + + public boolean isEmpty() { + return queue.isEmpty(); + } + + private long getElementSize(WALEntry walEntry) { + if (walEntry.isSignal()) { + return walEntry.getValue().serializedSize(); + } else { + return ((WALInfoEntry) walEntry).getValue() + } + if (walEntry.getValue() instanceof InsertNode) { + return InsertNodeMemoryEstimator.sizeOf((InsertNode) walEntry.getValue()); + } + return walEntry.getValue().serializedSize(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java index 0458320af05..a784ce7beb6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java @@ -54,8 +54,9 @@ public class SystemInfo { private long memorySizeForMemtable; private long memorySizeForCompaction; + private long memorySizeForWalQueue; private long totalDirectBufferMemorySizeLimit; - private Map<DataRegionInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>(); + private final Map<DataRegionInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>(); private long flushingMemTablesCost = 0L; private final AtomicLong directBufferMemoryCost = new AtomicLong(0); @@ -76,6 +77,8 @@ public class SystemInfo { private volatile boolean isEncodingFasterThanIo = true; + private final AtomicLong walEntryQueueMemoryCost = new AtomicLong(0); + private SystemInfo() { allocateWriteMemory(); } @@ -113,7 +116,7 @@ public class SystemInfo { return true; } else { logger.info( - "Change system to reject status. Triggered by: logical SG ({}), mem cost delta ({}), totalSgMemCost ({}), REJECT_THERSHOLD ({})", + "Change system to reject status. Triggered by: logical SG ({}), mem cost delta ({}), totalSgMemCost ({}), REJECT_THRESHOLD ({})", dataRegionInfo.getDataRegion().getDatabaseName(), delta, totalStorageGroupMemCost, @@ -403,6 +406,11 @@ public class SystemInfo { (config.getAllocateMemoryForStorageEngine() * config.getWriteProportionForMemtable()); memorySizeForCompaction = (long) (config.getAllocateMemoryForStorageEngine() * config.getCompactionProportion()); + memorySizeForWalQueue = + (long) + (config.getAllocateMemoryForStorageEngine() + * config.getWriteProportionForMemtable() + * config.getWalQueueProportion()); FLUSH_THRESHOLD = memorySizeForMemtable * config.getFlushProportion(); REJECT_THRESHOLD = memorySizeForMemtable * config.getRejectProportion(); WritingMetrics.getInstance().recordFlushThreshold(FLUSH_THRESHOLD); @@ -537,11 +545,23 @@ public class SystemInfo { return totalStorageGroupMemCost; } - public double getFlushThershold() { + public double getFlushThreshold() { return FLUSH_THRESHOLD; } - public double getRejectThershold() { + public double getRejectThreshold() { return REJECT_THRESHOLD; } + + public long getCurrentWalQueueMemoryCost() { + return walEntryQueueMemoryCost.get(); + } + + public void updateWalQueueMemoryCost(long delta) { + walEntryQueueMemoryCost.addAndGet(delta); + } + + public boolean cannotReserveMemoryForWalEntry(long walEntrySize) { + return walEntryQueueMemoryCost.get() + walEntrySize > memorySizeForWalQueue; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitorTest.java index 8c5ea0246e6..b9152ec9132 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitorTest.java @@ -77,11 +77,11 @@ public class CacheHitRatioMonitorTest { SystemInfo.getInstance().getTotalMemTableSize(), cacheHitRatioMonitor.getTotalMemTableSize()); assertEquals( - SystemInfo.getInstance().getFlushThershold(), + SystemInfo.getInstance().getFlushThreshold(), cacheHitRatioMonitor.getFlushThershold(), delta); assertEquals( - SystemInfo.getInstance().getRejectThershold(), + SystemInfo.getInstance().getRejectThreshold(), cacheHitRatioMonitor.getRejectThershold(), delta); assertEquals(
