This is an automated email from the ASF dual-hosted git repository. spricoder pushed a commit to branch feature/memory_zhy in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a02c2924309d408bc069025e69c56c0803828ad3 Author: spricoder <[email protected]> AuthorDate: Tue Sep 24 17:05:41 2024 +0800 init first version of memory manager and memory block --- .../SystemRuntimeOutOfMemoryCriticalException.java | 64 ++++ .../iotdb/commons/memory/IoTMemoryBlock.java | 187 +++++++++++ .../iotdb/commons/memory/IoTMemoryManager.java | 343 +++++++++++++++++++++ 3 files changed, 594 insertions(+) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/memory/SystemRuntimeOutOfMemoryCriticalException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/memory/SystemRuntimeOutOfMemoryCriticalException.java new file mode 100644 index 00000000000..9644b891832 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/memory/SystemRuntimeOutOfMemoryCriticalException.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.exception.memory; + +import java.util.Objects; + +public class SystemRuntimeOutOfMemoryCriticalException extends RuntimeException { + private final long timeStamp; + + public SystemRuntimeOutOfMemoryCriticalException(String message) { + super(message); + this.timeStamp = System.currentTimeMillis(); + } + + public SystemRuntimeOutOfMemoryCriticalException(String message, long timeStamp) { + super(message); + this.timeStamp = timeStamp; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof SystemRuntimeOutOfMemoryCriticalException + && Objects.equals( + getMessage(), ((SystemRuntimeOutOfMemoryCriticalException) obj).getMessage()) + && Objects.equals( + getTimeStamp(), ((SystemRuntimeOutOfMemoryCriticalException) obj).getTimeStamp()); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + + public long getTimeStamp() { + return timeStamp; + } + + @Override + public String toString() { + return "SystemRuntimeOutOfMemoryException{" + + "message='" + + getMessage() + + "', timeStamp=" + + getTimeStamp() + + "}"; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTMemoryBlock.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTMemoryBlock.java new file mode 100644 index 00000000000..2c869f34917 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTMemoryBlock.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.memory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; +import java.util.function.LongUnaryOperator; + +public class IoTMemoryBlock implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(IoTMemoryBlock.class); + + private final IoTMemoryManager ioTMemoryManager = null; // TODO @spricoder + + private final ReentrantLock lock = new ReentrantLock(); + + private final AtomicLong memoryUsageInBytes = new AtomicLong(0); + + private final AtomicReference<LongUnaryOperator> shrinkMethod = new AtomicReference<>(); + private final AtomicReference<BiConsumer<Long, Long>> shrinkCallback = new AtomicReference<>(); + private final AtomicReference<LongUnaryOperator> expandMethod = new AtomicReference<>(); + private final AtomicReference<BiConsumer<Long, Long>> expandCallback = new AtomicReference<>(); + + private volatile boolean isReleased = false; + + public IoTMemoryBlock(final long memoryUsageInBytes) { + this.memoryUsageInBytes.set(memoryUsageInBytes); + } + + public long getMemoryUsageInBytes() { + return memoryUsageInBytes.get(); + } + + public void setMemoryUsageInBytes(final long memoryUsageInBytes) { + this.memoryUsageInBytes.set(memoryUsageInBytes); + } + + public IoTMemoryBlock setShrinkMethod(final LongUnaryOperator shrinkMethod) { + this.shrinkMethod.set(shrinkMethod); + return this; + } + + public IoTMemoryBlock setShrinkCallback(final BiConsumer<Long, Long> shrinkCallback) { + this.shrinkCallback.set(shrinkCallback); + return this; + } + + public IoTMemoryBlock setExpandMethod(final LongUnaryOperator extendMethod) { + this.expandMethod.set(extendMethod); + return this; + } + + public IoTMemoryBlock setExpandCallback(final BiConsumer<Long, Long> expandCallback) { + this.expandCallback.set(expandCallback); + return this; + } + + boolean shrink() { + if (lock.tryLock()) { + try { + return doShrink(); + } finally { + lock.unlock(); + } + } + return false; + } + + private boolean doShrink() { + if (shrinkMethod.get() == null) { + return false; + } + + final long oldMemorySizeInBytes = memoryUsageInBytes.get(); + final long newMemorySizeInBytes = shrinkMethod.get().applyAsLong(memoryUsageInBytes.get()); + + final long memoryInBytesCanBeReleased = oldMemorySizeInBytes - newMemorySizeInBytes; + if (memoryInBytesCanBeReleased <= 0 + || !ioTMemoryManager.release(this, memoryInBytesCanBeReleased)) { + return false; + } + + if (shrinkCallback.get() != null) { + try { + shrinkCallback.get().accept(oldMemorySizeInBytes, newMemorySizeInBytes); + } catch (Exception e) { + LOGGER.warn("Failed to execute the shrink callback.", e); + } + } + return true; + } + + boolean expand() { + if (lock.tryLock()) { + try { + return doExpand(); + } finally { + lock.unlock(); + } + } + return false; + } + + private boolean doExpand() { + if (expandMethod.get() == null) { + return false; + } + + final long oldMemorySizeInBytes = memoryUsageInBytes.get(); + final long newMemorySizeInBytes = expandMethod.get().applyAsLong(memoryUsageInBytes.get()); + + final long memoryInBytesNeededToBeAllocated = newMemorySizeInBytes - oldMemorySizeInBytes; + if (memoryInBytesNeededToBeAllocated <= 0 + || !ioTMemoryManager.tryAllocate(this, memoryInBytesNeededToBeAllocated)) { + return false; + } + + if (expandCallback.get() != null) { + try { + expandCallback.get().accept(oldMemorySizeInBytes, newMemorySizeInBytes); + } catch (Exception e) { + LOGGER.warn("Failed to execute the expand callback.", e); + } + } + return true; + } + + boolean isReleased() { + return isReleased; + } + + void markAsReleased() { + isReleased = true; + } + + @Override + public String toString() { + return "IoTMemoryBlock{" + + "memoryUsageInBytes=" + + memoryUsageInBytes.get() + + ", isReleased=" + + isReleased + + '}'; + } + + @Override + public void close() { + while (true) { + try { + if (lock.tryLock(50, TimeUnit.MICROSECONDS)) { + try { + ioTMemoryManager.release(this); + break; + } finally { + lock.unlock(); + } + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Interrupted while waiting for the lock.", e); + } + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTMemoryManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTMemoryManager.java new file mode 100644 index 00000000000..3ade7ab8097 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IoTMemoryManager.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.memory; + +import org.apache.iotdb.commons.exception.memory.SystemRuntimeOutOfMemoryCriticalException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.LongUnaryOperator; + +public class IoTMemoryManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(IoTMemoryManager.class); + + /** whether enable memory management */ + private final boolean MEMORY_MANAGEMENT_ENABLED; + + /** max retry time of allocating memory */ + private final int MEMORY_ALLOCATE_MAX_RETRIES; + + /** retry interval of allocating memory, unit: ms */ + private final long MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS; + + /** total size of memory manager, unit: bytes */ + private final long TOTAL_MEMORY_SIZE_IN_BYTES; + + /** minimum size of allocating memory, unit: bytes */ + private final long MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES; + + private long usedMemorySizeInBytes; + + private final Set<IoTMemoryBlock> allocatedBlocks = new HashSet<>(); + + public IoTMemoryManager( + boolean is_enable_memory_management, + int memory_size_in_bytes, + int min_allocate_memory_size_in_bytes, + int max_retry_times, + long retry_interval_in_ms) { + this.MEMORY_MANAGEMENT_ENABLED = is_enable_memory_management; + this.TOTAL_MEMORY_SIZE_IN_BYTES = memory_size_in_bytes; + this.MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES = min_allocate_memory_size_in_bytes; + this.MEMORY_ALLOCATE_MAX_RETRIES = max_retry_times; + this.MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS = retry_interval_in_ms; + // TODO @spricoder register period job to expand all + } + + // region Allocate Memory Block + + public synchronized IoTMemoryBlock tryAllocate(long sizeInBytes) { + return tryAllocate(sizeInBytes, currentSize -> currentSize * 2 / 3); + } + + public synchronized IoTMemoryBlock tryAllocate( + long sizeInBytes, LongUnaryOperator customAllocateStrategy) { + if (!MEMORY_MANAGEMENT_ENABLED) { + return new IoTMemoryBlock(sizeInBytes); + } + + if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) { + return registerMemoryBlock(sizeInBytes); + } + + long sizeToAllocateInBytes = sizeInBytes; + while (sizeToAllocateInBytes > MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES) { + if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeToAllocateInBytes) { + LOGGER.info( + "tryAllocate: allocated memory, " + + "total memory size {} bytes, used memory size {} bytes, " + + "original requested memory size {} bytes," + + "actual requested memory size {} bytes", + TOTAL_MEMORY_SIZE_IN_BYTES, + usedMemorySizeInBytes, + sizeInBytes, + sizeToAllocateInBytes); + return registerMemoryBlock(sizeToAllocateInBytes); + } + + sizeToAllocateInBytes = + Math.max( + customAllocateStrategy.applyAsLong(sizeToAllocateInBytes), + MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES); + } + + if (tryShrink4Allocate(sizeToAllocateInBytes)) { + LOGGER.info( + "tryAllocate: allocated memory, " + + "total memory size {} bytes, used memory size {} bytes, " + + "original requested memory size {} bytes," + + "actual requested memory size {} bytes", + TOTAL_MEMORY_SIZE_IN_BYTES, + usedMemorySizeInBytes, + sizeInBytes, + sizeToAllocateInBytes); + return registerMemoryBlock(sizeToAllocateInBytes); + } else { + LOGGER.warn( + "tryAllocate: failed to allocate memory, " + + "total memory size {} bytes, used memory size {} bytes, " + + "requested memory size {} bytes", + TOTAL_MEMORY_SIZE_IN_BYTES, + usedMemorySizeInBytes, + sizeInBytes); + return registerMemoryBlock(0); + } + } + + public synchronized boolean tryAllocate( + IoTMemoryBlock block, long memoryInBytesNeededToBeAllocated) { + if (!MEMORY_MANAGEMENT_ENABLED || block == null || block.isReleased()) { + return false; + } + + if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= memoryInBytesNeededToBeAllocated) { + usedMemorySizeInBytes += memoryInBytesNeededToBeAllocated; + block.setMemoryUsageInBytes(block.getMemoryUsageInBytes() + memoryInBytesNeededToBeAllocated); + return true; + } + + return false; + } + + public synchronized IoTMemoryBlock forceAllocate(long sizeInBytes) + throws SystemRuntimeOutOfMemoryCriticalException { + if (!MEMORY_MANAGEMENT_ENABLED) { + return new IoTMemoryBlock(sizeInBytes); + } + + for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) { + if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) { + return registerMemoryBlock(sizeInBytes); + } + + try { + tryShrink4Allocate(sizeInBytes); + this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("forceAllocate: interrupted while waiting for available memory", e); + } + } + + throw new SystemRuntimeOutOfMemoryCriticalException( + String.format( + "forceAllocate: failed to allocate memory after %d retries, " + + "total memory size %d bytes, used memory size %d bytes, " + + "requested memory size %d bytes", + MEMORY_ALLOCATE_MAX_RETRIES, + TOTAL_MEMORY_SIZE_IN_BYTES, + usedMemorySizeInBytes, + sizeInBytes)); + } + + /** + * Allocate a {@link IoTMemoryBlock} only if memory already used is less than the specified + * threshold. + * + * @param sizeInBytes size of memory needed to allocate + * @param usedThreshold proportion of memory used, ranged from 0.0 to 1.0 + * @return {@code null} if the proportion of memory already used exceeds {@code usedThreshold}. + * Will return a memory block otherwise. + */ + public synchronized IoTMemoryBlock forceAllocateIfSufficient( + long sizeInBytes, float usedThreshold) { + if (usedThreshold < 0.0f || usedThreshold > 1.0f) { + return null; + } + + if (!MEMORY_MANAGEMENT_ENABLED) { + return new IoTMemoryBlock(sizeInBytes); + } + + if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes + && (float) usedMemorySizeInBytes / TOTAL_MEMORY_SIZE_IN_BYTES < usedThreshold) { + return forceAllocate(sizeInBytes); + } else { + long memoryToShrink = + Math.max( + usedMemorySizeInBytes - (long) (TOTAL_MEMORY_SIZE_IN_BYTES * usedThreshold), + sizeInBytes); + if (tryShrink4Allocate(memoryToShrink)) { + return forceAllocate(sizeInBytes); + } + } + return null; + } + + private IoTMemoryBlock registerMemoryBlock(long sizeInBytes) { + usedMemorySizeInBytes += sizeInBytes; + + final IoTMemoryBlock returnedMemoryBlock = new IoTMemoryBlock(sizeInBytes); + allocatedBlocks.add(returnedMemoryBlock); + return returnedMemoryBlock; + } + + // endregion + + // region Manage Memory Block + + /** + * Force resize memory block to target size + * + * @param block target memory block + * @param targetSize target memory size + */ + public synchronized void forceResize(IoTMemoryBlock block, long targetSize) { + if (block == null || block.isReleased()) { + LOGGER.warn("forceResize: cannot resize a null or released memory block"); + return; + } + + if (!MEMORY_MANAGEMENT_ENABLED) { + block.setMemoryUsageInBytes(targetSize); + return; + } + + final long oldSize = block.getMemoryUsageInBytes(); + + if (oldSize >= targetSize) { + usedMemorySizeInBytes -= oldSize - targetSize; + block.setMemoryUsageInBytes(targetSize); + return; + } + + long sizeInBytes = targetSize - oldSize; + for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) { + if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) { + usedMemorySizeInBytes += sizeInBytes; + block.setMemoryUsageInBytes(targetSize); + return; + } + + try { + tryShrink4Allocate(sizeInBytes); + this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("forceResize: interrupted while waiting for available memory", e); + } + } + + throw new SystemRuntimeOutOfMemoryCriticalException( + String.format( + "forceResize: failed to allocate memory after %d retries, " + + "total memory size %d bytes, used memory size %d bytes, " + + "requested memory size %d bytes", + MEMORY_ALLOCATE_MAX_RETRIES, + TOTAL_MEMORY_SIZE_IN_BYTES, + usedMemorySizeInBytes, + sizeInBytes)); + } + + private boolean tryShrink4Allocate(long sizeInBytes) { + final List<IoTMemoryBlock> shuffledBlocks = new ArrayList<>(allocatedBlocks); + Collections.shuffle(shuffledBlocks); + + while (true) { + boolean hasAtLeastOneBlockShrinkable = false; + for (final IoTMemoryBlock block : shuffledBlocks) { + if (block.shrink()) { + hasAtLeastOneBlockShrinkable = true; + if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) { + return true; + } + } + } + if (!hasAtLeastOneBlockShrinkable) { + return false; + } + } + } + + public synchronized void tryExpandAllAndCheckConsistency() { + allocatedBlocks.forEach(IoTMemoryBlock::expand); + + long blockSum = allocatedBlocks.stream().mapToLong(IoTMemoryBlock::getMemoryUsageInBytes).sum(); + if (blockSum != usedMemorySizeInBytes) { + LOGGER.warn( + "tryExpandAllAndCheckConsistency: memory usage is not consistent with allocated blocks," + + " usedMemorySizeInBytes is {} but sum of all blocks is {}", + usedMemorySizeInBytes, + blockSum); + } + } + + public synchronized void release(IoTMemoryBlock block) { + if (!MEMORY_MANAGEMENT_ENABLED || block == null || block.isReleased()) { + return; + } + + allocatedBlocks.remove(block); + usedMemorySizeInBytes -= block.getMemoryUsageInBytes(); + block.markAsReleased(); + + this.notifyAll(); + } + + public synchronized boolean release(IoTMemoryBlock block, long sizeInBytes) { + if (!MEMORY_MANAGEMENT_ENABLED || block == null || block.isReleased()) { + return false; + } + + usedMemorySizeInBytes -= sizeInBytes; + block.setMemoryUsageInBytes(block.getMemoryUsageInBytes() - sizeInBytes); + + this.notifyAll(); + + return true; + } + + // endregion + + public long getUsedMemorySizeInBytes() { + return usedMemorySizeInBytes; + } + + public long getTotalMemorySizeInBytes() { + return TOTAL_MEMORY_SIZE_IN_BYTES; + } +}
