This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 1cbef9c67bb Pipe: Optimize Batch and WAL memory allocation algorithms
(#15534) (#15609)
1cbef9c67bb is described below
commit 1cbef9c67bb66ffc60480c3847a3d84a34fe2ebe
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu May 29 18:24:59 2025 +0800
Pipe: Optimize Batch and WAL memory allocation algorithms (#15534) (#15609)
---
.../evolvable/batch/PipeTabletEventBatch.java | 41 +++++-
.../evolvable/batch/PipeTabletEventPlainBatch.java | 41 +-----
.../batch/PipeTabletEventTsFileBatch.java | 10 +-
.../resource/memory/PipeDynamicMemoryBlock.java | 156 +++++++++++++++++++++
.../pipe/resource/memory/PipeMemoryBlockType.java | 2 +
.../db/pipe/resource/memory/PipeMemoryManager.java | 56 ++++++++
.../resource/memory/PipeModelFixedMemoryBlock.java | 125 +++++++++++++++++
.../DynamicMemoryAllocationStrategy.java} | 17 ++-
.../strategy/ThresholdAllocationStrategy.java | 134 ++++++++++++++++++
.../dataregion/wal/utils/WALInsertNodeCache.java | 96 +++++++------
.../apache/iotdb/commons/conf/CommonConfig.java | 115 ++++++++++++++-
.../iotdb/commons/pipe/config/PipeConfig.java | 41 ++++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 42 ++++++
13 files changed, 775 insertions(+), 101 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index bd07d0c34ad..9bff91c50ad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -21,6 +21,10 @@ package
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
+import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -36,6 +40,11 @@ import java.util.Objects;
public abstract class PipeTabletEventBatch implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletEventBatch.class);
+ private static final PipeModelFixedMemoryBlock PIPE_MODEL_FIXED_MEMORY_BLOCK
=
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(
+
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
+ PipeMemoryBlockType.BATCH);
protected final List<EnrichedEvent> events = new ArrayList<>();
@@ -43,11 +52,25 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
private long firstEventProcessingTime = Long.MIN_VALUE;
protected long totalBufferSize = 0;
+ private final PipeDynamicMemoryBlock allocatedMemoryBlock;
protected volatile boolean isClosed = false;
- protected PipeTabletEventBatch(final int maxDelayInMs) {
+ protected PipeTabletEventBatch(final int maxDelayInMs, final long
requestMaxBatchSizeInBytes) {
this.maxDelayInMs = maxDelayInMs;
+
+ // limit in buffer size
+ this.allocatedMemoryBlock =
+
PIPE_MODEL_FIXED_MEMORY_BLOCK.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
+ allocatedMemoryBlock.setExpandable(false);
+
+ if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
+ LOGGER.info(
+ "PipeTabletEventBatch: the max batch size is adjusted from {} to {}
due to the "
+ + "memory restriction",
+ requestMaxBatchSizeInBytes,
+ getMaxBatchSizeInBytes());
+ }
}
/**
@@ -104,11 +127,17 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
throws WALPipeException, IOException;
public boolean shouldEmit() {
- return totalBufferSize >= getMaxBatchSizeInBytes()
- || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
+ final long diff = System.currentTimeMillis() - firstEventProcessingTime;
+ if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
+ allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double)
diff / maxDelayInMs);
+ return true;
+ }
+ return false;
}
- protected abstract long getMaxBatchSizeInBytes();
+ private long getMaxBatchSizeInBytes() {
+ return allocatedMemoryBlock.getMemoryUsageInBytes();
+ }
public synchronized void onSuccess() {
events.clear();
@@ -124,6 +153,10 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
clearEventsReferenceCount(PipeTabletEventBatch.class.getName());
events.clear();
+
+ if (allocatedMemoryBlock != null) {
+ allocatedMemoryBlock.close();
+ }
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index ea25b325562..292fc018ed7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -23,8 +23,6 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -52,36 +50,11 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
- // limit in buffer size
- private final PipeMemoryBlock allocatedMemoryBlock;
-
// Used to rate limit when transferring data
private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new
HashMap<>();
PipeTabletEventPlainBatch(final int maxDelayInMs, final long
requestMaxBatchSizeInBytes) {
- super(maxDelayInMs);
- this.allocatedMemoryBlock =
- PipeDataNodeResourceManager.memory()
- .tryAllocate(requestMaxBatchSizeInBytes)
- .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
- .setShrinkCallback(
- (oldMemory, newMemory) ->
- LOGGER.info(
- "The batch size limit has shrunk from {} to {}.",
oldMemory, newMemory))
- .setExpandMethod(
- oldMemory -> Math.min(Math.max(oldMemory, 1) * 2,
requestMaxBatchSizeInBytes))
- .setExpandCallback(
- (oldMemory, newMemory) ->
- LOGGER.info(
- "The batch size limit has expanded from {} to {}.",
oldMemory, newMemory));
-
- if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
- LOGGER.info(
- "PipeTabletEventBatch: the max batch size is adjusted from {} to {}
due to the "
- + "memory restriction",
- requestMaxBatchSizeInBytes,
- getMaxBatchSizeInBytes());
- }
+ super(maxDelayInMs, requestMaxBatchSizeInBytes);
}
@Override
@@ -113,11 +86,6 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
binaryBuffers, insertNodeBuffers, tabletBuffers);
}
- @Override
- protected long getMaxBatchSizeInBytes() {
- return allocatedMemoryBlock.getMemoryUsageInBytes();
- }
-
public Map<Pair<String, Long>, Long> deepCopyPipeName2BytesAccumulated() {
return new HashMap<>(pipe2BytesAccumulated);
}
@@ -156,11 +124,4 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
}
return buffer.limit();
}
-
- @Override
- public synchronized void close() {
- super.close();
-
- allocatedMemoryBlock.close();
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index f2fad1cb0cd..d839f422f03 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -86,8 +86,6 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
private static final String TS_FILE_PREFIX = "tb"; // tb means tablet batch
private final AtomicLong tsFileIdGenerator = new AtomicLong(0);
- private final long maxSizeInBytes;
-
private final Map<Pair<String, Long>, Double> pipeName2WeightMap = new
HashMap<>();
private final List<Tablet> tabletList = new ArrayList<>();
@@ -97,9 +95,8 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
private volatile TsFileWriter fileWriter;
public PipeTabletEventTsFileBatch(final int maxDelayInMs, final long
requestMaxBatchSizeInBytes) {
- super(maxDelayInMs);
+ super(maxDelayInMs, requestMaxBatchSizeInBytes);
- this.maxSizeInBytes = requestMaxBatchSizeInBytes;
try {
this.batchFileBaseDir = getNextBaseDir();
} catch (final Exception e) {
@@ -455,11 +452,6 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
}
}
- @Override
- protected long getMaxBatchSizeInBytes() {
- return maxSizeInBytes;
- }
-
@Override
public synchronized void onSuccess() {
super.onSuccess();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
new file mode 100644
index 00000000000..4e33b871828
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory;
+
+import org.apache.tsfile.utils.Pair;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+public class PipeDynamicMemoryBlock {
+
+ private final PipeModelFixedMemoryBlock fixedMemoryBlock;
+
+ private boolean isExpandable = true;
+
+ private Consumer<PipeDynamicMemoryBlock> expand = null;
+
+ private volatile boolean released = false;
+
+ private volatile long memoryUsageInBytes;
+
+ private volatile double historyMemoryEfficiency;
+
+ private volatile double currentMemoryEfficiency;
+
+ PipeDynamicMemoryBlock(
+ final @NotNull PipeModelFixedMemoryBlock fixedMemoryBlock, final long
memoryUsageInBytes) {
+ this.memoryUsageInBytes = Math.min(memoryUsageInBytes, 0);
+ this.fixedMemoryBlock = fixedMemoryBlock;
+ }
+
+ public long getMemoryUsageInBytes() {
+ return memoryUsageInBytes;
+ }
+
+ public void setMemoryUsageInBytes(final long memoryUsageInBytes) {
+ this.memoryUsageInBytes = memoryUsageInBytes;
+ }
+
+ public Pair<Double, Double> getMemoryEfficiency() {
+ synchronized (fixedMemoryBlock) {
+ return new Pair<>(historyMemoryEfficiency, currentMemoryEfficiency);
+ }
+ }
+
+ public void setExpandable(boolean expandable) {
+ isExpandable = expandable;
+ }
+
+ public void setExpand(Consumer<PipeDynamicMemoryBlock> expand) {
+ this.expand = expand;
+ }
+
+ public double getMemoryBlockUsageRatio() {
+ return (double) memoryUsageInBytes /
fixedMemoryBlock.getMemoryUsageInBytes();
+ }
+
+ public double getFixedMemoryBlockUsageRatio() {
+ return (double) fixedMemoryBlock.getMemoryAllocatedInBytes()
+ / fixedMemoryBlock.getMemoryUsageInBytes();
+ }
+
+ public long canAllocateMemorySize() {
+ return fixedMemoryBlock.getMemoryUsageInBytes() -
fixedMemoryBlock.getMemoryAllocatedInBytes();
+ }
+
+ public synchronized long getExpectedAverageAllocatedMemorySize() {
+ return fixedMemoryBlock.getMemoryUsageInBytes() /
fixedMemoryBlock.getMemoryBlocks().size();
+ }
+
+ public void updateCurrentMemoryEfficiencyAdjustMem(double
currentMemoryEfficiency) {
+ synchronized (fixedMemoryBlock) {
+ this.historyMemoryEfficiency = this.currentMemoryEfficiency;
+ if (Double.isNaN(currentMemoryEfficiency)
+ || Double.isInfinite(currentMemoryEfficiency)
+ || currentMemoryEfficiency < 0.0) {
+ currentMemoryEfficiency = 0.0;
+ }
+ this.currentMemoryEfficiency = Math.min(currentMemoryEfficiency, 1.0);
+ fixedMemoryBlock.dynamicallyAdjustMemory(this);
+ }
+ }
+
+ public long getFixedMemoryCapacity() {
+ return fixedMemoryBlock.getMemoryUsageInBytes();
+ }
+
+ public void updateMemoryEfficiency(
+ double currentMemoryEfficiency, double historyMemoryEfficiency) {
+ synchronized (fixedMemoryBlock) {
+ if (Double.isNaN(currentMemoryEfficiency)
+ || Double.isInfinite(currentMemoryEfficiency)
+ || currentMemoryEfficiency < 0.0) {
+ currentMemoryEfficiency = 0.0;
+ }
+
+ if (Double.isNaN(historyMemoryEfficiency)
+ || Double.isInfinite(historyMemoryEfficiency)
+ || historyMemoryEfficiency < 0.0) {
+ currentMemoryEfficiency = 0.0;
+ }
+
+ this.historyMemoryEfficiency = Math.min(historyMemoryEfficiency, 1.0);
+ this.currentMemoryEfficiency = Math.min(currentMemoryEfficiency, 1.0);
+ }
+ }
+
+ public Stream<PipeDynamicMemoryBlock> getMemoryBlocks() {
+ return fixedMemoryBlock.getMemoryBlocksStream();
+ }
+
+ public void applyForDynamicMemory(final long memoryUsageInBytes) {
+ fixedMemoryBlock.resetMemoryBlockSize(this, memoryUsageInBytes);
+ }
+
+ public boolean isReleased() {
+ return released;
+ }
+
+ public void close() {
+ if (released) {
+ return;
+ }
+ synchronized (fixedMemoryBlock) {
+ if (!released) {
+ fixedMemoryBlock.releaseMemory(this);
+ released = true;
+ }
+ }
+ }
+
+ void doExpand() {
+ if (isExpandable && expand != null) {
+ expand.accept(this);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
index 5b626df04c3..846fc7dd1ce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
@@ -23,4 +23,6 @@ public enum PipeMemoryBlockType {
NORMAL,
TABLET,
TS_FILE,
+ BATCH,
+ WAL
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index dbade8bea9c..34934bef812 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalExc
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import
org.apache.iotdb.db.pipe.resource.memory.strategy.ThresholdAllocationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,6 +104,18 @@ public class PipeMemoryManager {
* getTotalNonFloatingMemorySizeInBytes();
}
+ public long getAllocatedMemorySizeInBytesOfWAL() {
+ return (long)
+ (PipeConfig.getInstance().getPipeDataStructureWalMemoryProportion()
+ * getTotalNonFloatingMemorySizeInBytes());
+ }
+
+ public long getAllocatedMemorySizeInBytesOfBatch() {
+ return (long)
+ (PipeConfig.getInstance().getPipeDataStructureBatchMemoryProportion()
+ * getTotalNonFloatingMemorySizeInBytes());
+ }
+
public boolean isEnough4TabletParsing() {
return (double) usedMemorySizeInBytesOfTablets + (double)
usedMemorySizeInBytesOfTsFiles
< EXCEED_PROTECT_THRESHOLD *
allowedMaxMemorySizeInBytesOfTabletsAndTsFiles()
@@ -232,6 +245,41 @@ public class PipeMemoryManager {
}
}
+ public PipeModelFixedMemoryBlock forceAllocateForModelFixedMemoryBlock(
+ long fixedSizeInBytes, PipeMemoryBlockType type)
+ throws PipeRuntimeOutOfMemoryCriticalException {
+ if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
+ return new PipeModelFixedMemoryBlock(Long.MAX_VALUE, new
ThresholdAllocationStrategy());
+ }
+
+ if (fixedSizeInBytes == 0) {
+ return (PipeModelFixedMemoryBlock) registerMemoryBlock(0, type);
+ }
+
+ for (int i = 1, size =
PipeConfig.getInstance().getPipeMemoryAllocateMaxRetries();
+ i <= size;
+ i++) {
+ if (getFreeMemorySizeInBytes() >= fixedSizeInBytes) {
+ break;
+ }
+
+ try {
+
Thread.sleep(PipeConfig.getInstance().getPipeMemoryAllocateRetryIntervalInMs());
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("forceAllocateWithRetry: interrupted while waiting for
available memory", ex);
+ }
+ }
+
+ if (getFreeMemorySizeInBytes() < fixedSizeInBytes) {
+ return (PipeModelFixedMemoryBlock)
forceAllocateWithRetry(getFreeMemorySizeInBytes(), type);
+ }
+
+ synchronized (this) {
+ return (PipeModelFixedMemoryBlock)
forceAllocateWithRetry(fixedSizeInBytes, type);
+ }
+ }
+
private PipeMemoryBlock forceAllocateWithRetry(long sizeInBytes,
PipeMemoryBlockType type)
throws PipeRuntimeOutOfMemoryCriticalException {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
@@ -240,6 +288,9 @@ public class PipeMemoryManager {
return new PipeTabletMemoryBlock(sizeInBytes);
case TS_FILE:
return new PipeTsFileMemoryBlock(sizeInBytes);
+ case BATCH:
+ case WAL:
+ return new PipeModelFixedMemoryBlock(sizeInBytes, new
ThresholdAllocationStrategy());
default:
return new PipeMemoryBlock(sizeInBytes);
}
@@ -467,6 +518,11 @@ public class PipeMemoryManager {
case TS_FILE:
returnedMemoryBlock = new PipeTsFileMemoryBlock(sizeInBytes);
break;
+ case BATCH:
+ case WAL:
+ returnedMemoryBlock =
+ new PipeModelFixedMemoryBlock(sizeInBytes, new
ThresholdAllocationStrategy());
+ break;
default:
returnedMemoryBlock = new PipeMemoryBlock(sizeInBytes);
break;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeModelFixedMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeModelFixedMemoryBlock.java
new file mode 100644
index 00000000000..647fb81a4b9
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeModelFixedMemoryBlock.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory;
+
+import
org.apache.iotdb.db.pipe.resource.memory.strategy.DynamicMemoryAllocationStrategy;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+public class PipeModelFixedMemoryBlock extends PipeFixedMemoryBlock {
+
+ private final Set<PipeDynamicMemoryBlock> memoryBlocks =
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ private final DynamicMemoryAllocationStrategy allocationStrategy;
+
+ private volatile long memoryAllocatedInBytes;
+
+ public PipeModelFixedMemoryBlock(
+ final long memoryUsageInBytes, final DynamicMemoryAllocationStrategy
allocationStrategy) {
+ super(memoryUsageInBytes);
+ this.memoryAllocatedInBytes = 0;
+ this.allocationStrategy = allocationStrategy;
+ }
+
+ public synchronized PipeDynamicMemoryBlock registerPipeBatchMemoryBlock(
+ final long memorySizeInBytes) {
+ final PipeDynamicMemoryBlock memoryBlock = new
PipeDynamicMemoryBlock(this, 0);
+ memoryBlocks.add(memoryBlock);
+ if (memorySizeInBytes != 0) {
+ resetMemoryBlockSize(memoryBlock, memorySizeInBytes);
+ double e = (double) getMemoryUsageInBytes() / memorySizeInBytes;
+ memoryBlock.updateMemoryEfficiency(e, e);
+ return memoryBlock;
+ }
+
+ memoryBlock.updateMemoryEfficiency(0.0, 0.0);
+ return memoryBlock;
+ }
+
+ @Override
+ public synchronized boolean expand() {
+ // Ensure that the memory block that gets most of the memory is released
first, which can reduce
+ // the jitter of memory allocationIf the memory block is not expanded, it
will not be expanded
+ // again.This function not only completes the expansion but also the
reduction.
+ memoryBlocks.stream()
+ .sorted((a, b) -> Long.compare(b.getMemoryUsageInBytes(),
a.getMemoryUsageInBytes()))
+ .forEach(PipeDynamicMemoryBlock::doExpand);
+ return false;
+ }
+
+ public long getMemoryAllocatedInBytes() {
+ return memoryAllocatedInBytes;
+ }
+
+ public synchronized Set<PipeDynamicMemoryBlock> getMemoryBlocks() {
+ return memoryBlocks;
+ }
+
+ synchronized void releaseMemory(final PipeDynamicMemoryBlock memoryBlock) {
+ resetMemoryBlockSize(memoryBlock, 0);
+ memoryBlocks.remove(memoryBlock);
+ }
+
+ synchronized void dynamicallyAdjustMemory(final PipeDynamicMemoryBlock
block) {
+ if (this.isReleased() || block.isReleased() ||
!memoryBlocks.contains(block)) {
+ throw new IllegalStateException("The memory block has been released");
+ }
+ allocationStrategy.dynamicallyAdjustMemory(block);
+ }
+
+ synchronized void resetMemoryBlockSize(
+ final PipeDynamicMemoryBlock block, final long memorySizeInBytes) {
+ if (this.isReleased() || block.isReleased() ||
!memoryBlocks.contains(block)) {
+ throw new IllegalStateException("The memory block has been released");
+ }
+
+ final long diff = memorySizeInBytes - block.getMemoryUsageInBytes();
+
+ // If the capacity is expanded, determine whether it will exceed the
maximum value of the fixed
+ // module
+ if (getMemoryUsageInBytes() - memoryAllocatedInBytes < diff) {
+ // Pay attention to the order of calls, otherwise it will cause resource
leakage
+ block.setMemoryUsageInBytes(
+ block.getMemoryUsageInBytes() + getMemoryUsageInBytes() -
memoryAllocatedInBytes);
+ memoryAllocatedInBytes = getMemoryUsageInBytes();
+ return;
+ }
+
+ memoryAllocatedInBytes = memoryAllocatedInBytes + diff;
+ block.setMemoryUsageInBytes(memorySizeInBytes);
+ }
+
+ Stream<PipeDynamicMemoryBlock> getMemoryBlocksStream() {
+ if (isReleased()) {
+ throw new IllegalStateException("The memory block has been released");
+ }
+ return memoryBlocks.stream();
+ }
+
+ @Override
+ public synchronized void close() {
+ memoryBlocks.forEach(PipeDynamicMemoryBlock::close);
+ super.close();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/DynamicMemoryAllocationStrategy.java
similarity index 50%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/DynamicMemoryAllocationStrategy.java
index 5b626df04c3..8e5ba9af053 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/DynamicMemoryAllocationStrategy.java
@@ -17,10 +17,17 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.resource.memory;
+package org.apache.iotdb.db.pipe.resource.memory.strategy;
-public enum PipeMemoryBlockType {
- NORMAL,
- TABLET,
- TS_FILE,
+import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
+
+// Now let's define the operation memory behavior: Producers produce memory,
consumers consume
+// memory, and in order to ensure that consumers do not encounter back
pressure, the memory that
+// consumers need to use is allocated in advance. Consumer instances obtain
their expected memory
+// through allocation strategies, and the total memory of all consumer
instances must not be greater
+// than the pre-allocated memory. The memory allocation algorithm is to adjust
the memory of
+// consumers so that the consumption rate can reach the optimal
+public interface DynamicMemoryAllocationStrategy {
+
+ void dynamicallyAdjustMemory(PipeDynamicMemoryBlock dynamicMemoryBlock);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/ThresholdAllocationStrategy.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/ThresholdAllocationStrategy.java
new file mode 100644
index 00000000000..72a390f799e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/ThresholdAllocationStrategy.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory.strategy;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+// The algorithm is optimized for different scenarios: The following describes
the behavior of the
+// algorithm in different scenarios:
+// 1. When the memory is large enough, it will try to allocate memory to the
memory block
+// 2. When the memory is insufficient, the algorithm will try to ensure that
the memory with a
+// relatively large memory share is released
+public class ThresholdAllocationStrategy implements
DynamicMemoryAllocationStrategy {
+
+ private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
+
+ @Override
+ public void dynamicallyAdjustMemory(final PipeDynamicMemoryBlock
dynamicMemoryBlock) {
+ final double deficitRatio = calculateDeficitRatio(dynamicMemoryBlock);
+ final long oldMemoryUsageInBytes =
dynamicMemoryBlock.getMemoryUsageInBytes();
+ final long expectedMemory = (long) (oldMemoryUsageInBytes / deficitRatio);
+ final double memoryBlockUsageRatio =
dynamicMemoryBlock.getMemoryBlockUsageRatio();
+ final long maximumMemoryIncrease =
+ (long)
+ (dynamicMemoryBlock.getFixedMemoryCapacity()
+ *
PIPE_CONFIG.getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio());
+
+ // Avoid overflow and infinite values
+ if (deficitRatio <= 0.0 || oldMemoryUsageInBytes == 0 || expectedMemory ==
0) {
+ dynamicMemoryBlock.applyForDynamicMemory(maximumMemoryIncrease);
+ final double efficiencyRatio =
+ (double) dynamicMemoryBlock.getMemoryUsageInBytes() /
maximumMemoryIncrease;
+ dynamicMemoryBlock.updateMemoryEfficiency(efficiencyRatio,
efficiencyRatio);
+ return;
+ }
+
+ // No matter what, give priority to applying for memory use, and adjust
the memory size when the
+ // memory is insufficient
+ final double lowUsageThreshold =
+ PIPE_CONFIG.getPipeThresholdAllocationStrategyLowUsageThreshold();
+ if (dynamicMemoryBlock.getFixedMemoryBlockUsageRatio()
+ <
PIPE_CONFIG.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold()) {
+ if (deficitRatio >= 1.0) {
+ return;
+ }
+
+ final long maxAvailableMemory =
+ Math.min(expectedMemory, dynamicMemoryBlock.canAllocateMemorySize());
+ long newMemoryRequest;
+
+ // Need to ensure that you get memory in smaller chunks and get more
memory faster
+ if (memoryBlockUsageRatio > lowUsageThreshold) {
+ newMemoryRequest =
+ Math.min(oldMemoryUsageInBytes + oldMemoryUsageInBytes / 2,
maxAvailableMemory);
+ } else {
+ newMemoryRequest = Math.min(oldMemoryUsageInBytes * 2,
maxAvailableMemory);
+ }
+
+ dynamicMemoryBlock.applyForDynamicMemory(newMemoryRequest);
+ final double efficiencyRatio =
+ dynamicMemoryBlock.getMemoryUsageInBytes() / (double) expectedMemory;
+ dynamicMemoryBlock.updateMemoryEfficiency(efficiencyRatio,
efficiencyRatio);
+ return;
+ }
+
+ // Entering this logic means that the memory is insufficient and the
memory allocation needs to
+ // be adjusted
+ final AtomicBoolean isMemoryNotEnough = new AtomicBoolean(false);
+ final double averageDeficitRatio =
+ dynamicMemoryBlock
+ .getMemoryBlocks()
+ .mapToDouble(
+ block -> {
+ double ratio = calculateDeficitRatio(block);
+ if (block.getMemoryUsageInBytes() == 0 || ratio == 0.0) {
+ isMemoryNotEnough.set(true);
+ }
+ return ratio;
+ })
+ .average()
+ .orElse(1.0);
+
+ final double adjustmentThreshold =
PIPE_CONFIG.getPipeDynamicMemoryAdjustmentThreshold();
+ // When memory is insufficient, try to ensure that smaller memory blocks
apply for less memory,
+ // and larger memory blocks release more memory.
+ final double diff =
+ isMemoryNotEnough.get() && averageDeficitRatio > 2 *
adjustmentThreshold
+ ? averageDeficitRatio - deficitRatio - adjustmentThreshold
+ : averageDeficitRatio - deficitRatio;
+
+ if (Math.abs(diff) >
PIPE_CONFIG.getPipeDynamicMemoryAdjustmentThreshold()) {
+ final long mem = (long) ((dynamicMemoryBlock.getMemoryUsageInBytes() /
deficitRatio) * diff);
+
dynamicMemoryBlock.applyForDynamicMemory(dynamicMemoryBlock.getMemoryUsageInBytes()
+ mem);
+ if (oldMemoryUsageInBytes != dynamicMemoryBlock.getMemoryUsageInBytes())
{
+ final double efficiencyRatio =
+ dynamicMemoryBlock.getMemoryUsageInBytes() / (double)
expectedMemory;
+ dynamicMemoryBlock.updateMemoryEfficiency(efficiencyRatio,
efficiencyRatio);
+ }
+ } else if (memoryBlockUsageRatio > lowUsageThreshold
+ && memoryBlockUsageRatio >
dynamicMemoryBlock.getExpectedAverageAllocatedMemorySize()) {
+ // If there is insufficient memory, some memory must be released
+ dynamicMemoryBlock.applyForDynamicMemory(oldMemoryUsageInBytes / 2);
+ dynamicMemoryBlock.updateMemoryEfficiency(deficitRatio / 2, deficitRatio
/ 2);
+ }
+ }
+
+ private double calculateDeficitRatio(final PipeDynamicMemoryBlock block) {
+ final Pair<Double, Double> memoryEfficiency = block.getMemoryEfficiency();
+ double pipeDynamicMemoryHistoryWeight =
PIPE_CONFIG.getPipeDynamicMemoryHistoryWeight();
+ return (1 - pipeDynamicMemoryHistoryWeight) * memoryEfficiency.getRight()
+ + pipeDynamicMemoryHistoryWeight * memoryEfficiency.getLeft();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 983363689c7..b9419faf695 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
+import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
@@ -61,7 +63,14 @@ public class WALInsertNodeCache {
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
- private final PipeMemoryBlock allocatedMemoryBlock;
+ private static final PipeModelFixedMemoryBlock WAL_MODEL_FIXED_MEMORY =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(
+
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(),
+ PipeMemoryBlockType.WAL);
+
+ private final PipeDynamicMemoryBlock memoryBlock;
+
// Used to adjust the memory usage of the cache
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
private final AtomicBoolean isBatchLoadEnabled = new AtomicBoolean(true);
@@ -82,28 +91,12 @@ public class WALInsertNodeCache {
* CONFIG.getWalFileSizeThresholdInByte()
/ CONFIG.getDataRegionNum(),
0.5 * CONFIG.getAllocateMemoryForPipe() /
CONFIG.getDataRegionNum());
- allocatedMemoryBlock =
- PipeDataNodeResourceManager.memory()
- .tryAllocate(requestedAllocateSize)
- .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
- .setExpandMethod(
- oldMemory -> Math.min(Math.max(oldMemory, 1) * 2,
requestedAllocateSize))
- .setExpandCallback(
- (oldMemory, newMemory) -> {
- memoryUsageCheatFactor.updateAndGet(
- factor -> factor / ((double) newMemory / oldMemory));
- isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
- LOGGER.info(
- "WALInsertNodeCache.allocatedMemoryBlock of dataRegion
{} has expanded from {} to {}.",
- dataRegionId,
- oldMemory,
- newMemory);
- });
+ memoryBlock =
WAL_MODEL_FIXED_MEMORY.registerPipeBatchMemoryBlock(requestedAllocateSize);
isBatchLoadEnabled.set(
- allocatedMemoryBlock.getMemoryUsageInBytes() >=
CONFIG.getWalFileSizeThresholdInByte());
+ memoryBlock.getMemoryUsageInBytes() >=
CONFIG.getWalFileSizeThresholdInByte());
lruCache =
Caffeine.newBuilder()
- .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
+ .maximumWeight(requestedAllocateSize)
.weigher(
(Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
(position, pair) -> {
@@ -124,30 +117,51 @@ public class WALInsertNodeCache {
})
.recordStats()
.build(new WALInsertNodeCacheLoader());
- allocatedMemoryBlock.setShrinkCallback(
- (oldMemory, newMemory) -> {
- memoryUsageCheatFactor.updateAndGet(factor -> factor * ((double)
oldMemory / newMemory));
- isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
- LOGGER.info(
- "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has
shrunk from {} to {}.",
- dataRegionId,
- oldMemory,
- newMemory);
- if (CONFIG.getWALCacheShrinkClearEnabled()) {
- try {
- lruCache.cleanUp();
- } catch (Exception e) {
- LOGGER.warn(
- "Failed to clear WALInsertNodeCache for dataRegion ID: {}.",
dataRegionId, e);
- return;
- }
- LOGGER.info(
- "Successfully cleared WALInsertNodeCache for dataRegion ID:
{}.", dataRegionId);
+
+ memoryBlock.setExpandable(true);
+ memoryBlock.setExpand(
+ memoryBlock -> {
+ final long oldMemory = memoryBlock.getMemoryUsageInBytes();
+
memoryBlock.updateCurrentMemoryEfficiencyAdjustMem(lruCache.stats().hitRate());
+ final long newMemory = memoryBlock.getMemoryUsageInBytes();
+ if (newMemory > oldMemory) {
+ setExpandCallback(oldMemory, newMemory, dataRegionId);
+ } else if (newMemory < oldMemory) {
+ shrinkCallback(oldMemory, newMemory, dataRegionId);
}
});
PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
}
+ private void setExpandCallback(long oldMemory, long newMemory, Integer
dataRegionId) {
+ memoryUsageCheatFactor.updateAndGet(factor -> factor / ((double) newMemory
/ oldMemory));
+ isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
+ LOGGER.info(
+ "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has expanded
from {} to {}.",
+ dataRegionId,
+ oldMemory,
+ newMemory);
+ }
+
+ private void shrinkCallback(long oldMemory, long newMemory, Integer
dataRegionId) {
+ memoryUsageCheatFactor.updateAndGet(factor -> factor * ((double) oldMemory
/ newMemory));
+ isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
+ LOGGER.info(
+ "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has shrunk
from {} to {}.",
+ dataRegionId,
+ oldMemory,
+ newMemory);
+ if (CONFIG.getWALCacheShrinkClearEnabled()) {
+ try {
+ lruCache.cleanUp();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to clear WALInsertNodeCache for dataRegion ID:
{}.", dataRegionId, e);
+ return;
+ }
+ LOGGER.info("Successfully cleared WALInsertNodeCache for dataRegion ID:
{}.", dataRegionId);
+ }
+ }
+
/////////////////////////// Getter & Setter ///////////////////////////
public InsertNode getInsertNode(final WALEntryPosition position) {
@@ -373,7 +387,7 @@ public class WALInsertNodeCache {
@TestOnly
public void clear() {
lruCache.invalidateAll();
- allocatedMemoryBlock.close();
+ memoryBlock.close();
memTablesNeedSearch.clear();
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index c3f2e0f4a11..4a1c9ea104c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -216,8 +216,10 @@ public class CommonConfig {
private int pipeDataStructureTabletRowSize = 2048;
private int pipeDataStructureTabletSizeInBytes = 2097152;
- private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold =
0.4;
- private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold =
0.4;
+ private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold =
0.2;
+ private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold =
0.2;
+ private double pipeDataStructureWalMemoryProportion = 0.2;
+ private double PipeDataStructureBatchMemoryProportion = 0.2;
private double pipeTotalFloatingMemoryProportion = 0.2;
private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount =
10_000;
@@ -302,6 +304,11 @@ public class CommonConfig {
private PipeRemainingTimeRateAverageTime
pipeRemainingTimeCommitRateAverageTime =
PipeRemainingTimeRateAverageTime.MEAN;
private double pipeTsFileScanParsingThreshold = 0.05;
+ private double pipeDynamicMemoryHistoryWeight = 0.5;
+ private double pipeDynamicMemoryAdjustmentThreshold = 0.05;
+ private double pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio =
0.1d;
+ private double pipeThresholdAllocationStrategyLowUsageThreshold = 0.2d;
+ private double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold
= 0.8d;
private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8
minutes
private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L;
// 3 minutes
@@ -841,6 +848,34 @@ public class CommonConfig {
pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold);
}
+ public double getPipeDataStructureWalMemoryProportion() {
+ return pipeDataStructureWalMemoryProportion;
+ }
+
+ public void setPipeDataStructureWalMemoryProportion(double
pipeDataStructureWalMemoryProportion) {
+ if (this.pipeDataStructureWalMemoryProportion ==
pipeDataStructureWalMemoryProportion) {
+ return;
+ }
+ this.pipeDataStructureWalMemoryProportion =
pipeDataStructureWalMemoryProportion;
+ logger.info(
+ "pipeDataStructureWalMemoryProportion is set to {}.",
pipeDataStructureWalMemoryProportion);
+ }
+
+ public double getPipeDataStructureBatchMemoryProportion() {
+ return PipeDataStructureBatchMemoryProportion;
+ }
+
+ public void setPipeDataStructureBatchMemoryProportion(
+ double PipeDataStructureBatchMemoryProportion) {
+ if (this.PipeDataStructureBatchMemoryProportion ==
PipeDataStructureBatchMemoryProportion) {
+ return;
+ }
+ this.PipeDataStructureBatchMemoryProportion =
PipeDataStructureBatchMemoryProportion;
+ logger.info(
+ "PipeDataStructureBatchMemoryProportion is set to {}.",
+ PipeDataStructureBatchMemoryProportion);
+ }
+
public double getPipeTotalFloatingMemoryProportion() {
return pipeTotalFloatingMemoryProportion;
}
@@ -1824,6 +1859,82 @@ public class CommonConfig {
logger.info("pipeTsFileScanParsingThreshold is set to {}",
pipeTsFileScanParsingThreshold);
}
+ public double getPipeDynamicMemoryHistoryWeight() {
+ return pipeDynamicMemoryHistoryWeight;
+ }
+
+ public void setPipeDynamicMemoryHistoryWeight(double
pipeDynamicMemoryHistoryWeight) {
+ if (this.pipeDynamicMemoryHistoryWeight == pipeDynamicMemoryHistoryWeight)
{
+ return;
+ }
+ this.pipeDynamicMemoryHistoryWeight = pipeDynamicMemoryHistoryWeight;
+ logger.info("PipeDynamicMemoryHistoryWeight is set to {}",
pipeDynamicMemoryHistoryWeight);
+ }
+
+ public double getPipeDynamicMemoryAdjustmentThreshold() {
+ return pipeDynamicMemoryAdjustmentThreshold;
+ }
+
+ public void setPipeDynamicMemoryAdjustmentThreshold(double
pipeDynamicMemoryAdjustmentThreshold) {
+ if (this.pipeDynamicMemoryAdjustmentThreshold ==
pipeDynamicMemoryAdjustmentThreshold) {
+ return;
+ }
+ this.pipeDynamicMemoryAdjustmentThreshold =
pipeDynamicMemoryAdjustmentThreshold;
+ logger.info(
+ "pipeDynamicMemoryAdjustmentThreshold is set to {}",
pipeDynamicMemoryAdjustmentThreshold);
+ }
+
+ public double
getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio() {
+ return pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio;
+ }
+
+ public void setPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio(
+ double pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio) {
+ if (this.pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio
+ == pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio) {
+ return;
+ }
+ this.pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio =
+ pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio;
+ logger.info(
+ "pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio is set to
{}",
+ pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio);
+ }
+
+ public double getPipeThresholdAllocationStrategyLowUsageThreshold() {
+ return pipeThresholdAllocationStrategyLowUsageThreshold;
+ }
+
+ public void setPipeThresholdAllocationStrategyLowUsageThreshold(
+ double pipeThresholdAllocationStrategyLowUsageThreshold) {
+ if (this.pipeThresholdAllocationStrategyLowUsageThreshold
+ == pipeThresholdAllocationStrategyLowUsageThreshold) {
+ return;
+ }
+ this.pipeThresholdAllocationStrategyLowUsageThreshold =
+ pipeThresholdAllocationStrategyLowUsageThreshold;
+ logger.info(
+ "pipeMemoryBlockLowUsageThreshold is set to {}",
+ pipeThresholdAllocationStrategyLowUsageThreshold);
+ }
+
+ public double
getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold() {
+ return pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold;
+ }
+
+ public void setPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold(
+ double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold) {
+ if (this.pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold
+ == pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold) {
+ return;
+ }
+ this.pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold =
+ pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold;
+ logger.info(
+ "pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold is set
to {}",
+ pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold);
+ }
+
public double getPipeAllSinksRateLimitBytesPerSecond() {
return pipeAllSinksRateLimitBytesPerSecond;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index c08fab09aff..63698dc46cd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -79,6 +79,14 @@ public class PipeConfig {
return
COMMON_CONFIG.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold();
}
+ public double getPipeDataStructureWalMemoryProportion() {
+ return COMMON_CONFIG.getPipeDataStructureWalMemoryProportion();
+ }
+
+ public double getPipeDataStructureBatchMemoryProportion() {
+ return COMMON_CONFIG.getPipeDataStructureBatchMemoryProportion();
+ }
+
public double getPipeTotalFloatingMemoryProportion() {
return COMMON_CONFIG.getPipeTotalFloatingMemoryProportion();
}
@@ -223,6 +231,26 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeTsFileScanParsingThreshold();
}
+ public double getPipeDynamicMemoryHistoryWeight() {
+ return COMMON_CONFIG.getPipeDynamicMemoryHistoryWeight();
+ }
+
+ public double getPipeDynamicMemoryAdjustmentThreshold() {
+ return COMMON_CONFIG.getPipeDynamicMemoryAdjustmentThreshold();
+ }
+
+ public double
getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio() {
+ return
COMMON_CONFIG.getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio();
+ }
+
+ public double getPipeThresholdAllocationStrategyLowUsageThreshold() {
+ return COMMON_CONFIG.getPipeThresholdAllocationStrategyLowUsageThreshold();
+ }
+
+ public double
getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold() {
+ return
COMMON_CONFIG.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold();
+ }
+
/////////////////////////////// Meta Consistency
///////////////////////////////
public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -487,6 +515,19 @@ public class PipeConfig {
"PipeRemainingTimeCommitRateAverageTime: {}",
getPipeRemainingTimeCommitRateAverageTime());
LOGGER.info("PipeTsFileScanParsingThreshold(): {}",
getPipeTsFileScanParsingThreshold());
+ LOGGER.info("PipeDynamicMemoryHistoryWeight: {}",
getPipeDynamicMemoryHistoryWeight());
+ LOGGER.info(
+ "PipeDynamicMemoryAdjustmentThreshold: {}",
getPipeDynamicMemoryAdjustmentThreshold());
+ LOGGER.info(
+ "PipeThresholdAllocationStrategyMaximumMemoryIncrementRatio: {}",
+ getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio());
+ LOGGER.info(
+ "PipeThresholdAllocationStrategyLowUsageThreshold: {}",
+ getPipeThresholdAllocationStrategyLowUsageThreshold());
+ LOGGER.info(
+ "PipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold: {}",
+ getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold());
+
LOGGER.info(
"PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index f25c285ae5a..ea2829172ce 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -228,6 +228,16 @@ public class PipeDescriptor {
"pipe_data_structure_ts_file_memory_block_allocation_reject_threshold",
String.valueOf(
config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()))));
+ config.setPipeDataStructureWalMemoryProportion(
+ Double.parseDouble(
+ properties.getProperty(
+ "pipe_data_structure_wal_memory_proportion",
+
String.valueOf(config.getPipeDataStructureWalMemoryProportion()))));
+ config.setPipeDataStructureBatchMemoryProportion(
+ Double.parseDouble(
+ properties.getProperty(
+ "pipe_data_structure_batch_memory_proportion",
+
String.valueOf(config.getPipeDataStructureBatchMemoryProportion()))));
config.setPipeTotalFloatingMemoryProportion(
Double.parseDouble(
properties.getProperty(
@@ -520,6 +530,38 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_tsfile_scan_parsing_threshold",
String.valueOf(config.getPipeTsFileScanParsingThreshold()))));
+
+ config.setPipeDynamicMemoryHistoryWeight(
+ Double.parseDouble(
+ properties.getProperty(
+ "pipe_dynamic_memory_history_weight",
+ String.valueOf(config.getPipeDynamicMemoryHistoryWeight()))));
+
+ config.setPipeDynamicMemoryAdjustmentThreshold(
+ Double.parseDouble(
+ properties.getProperty(
+ "pipe_dynamic_memory_adjustment_threshold",
+
String.valueOf(config.getPipeDynamicMemoryAdjustmentThreshold()))));
+
+ config.setPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio(
+ Double.parseDouble(
+ properties.getProperty(
+
"pipe_threshold_allocation_strategy_maximum_memory_increment_ratio",
+ String.valueOf(
+
config.getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio()))));
+
+ config.setPipeThresholdAllocationStrategyLowUsageThreshold(
+ Double.parseDouble(
+ properties.getProperty(
+ "pipe_threshold_allocation_strategy_low_usage_threshold",
+
String.valueOf(config.getPipeThresholdAllocationStrategyLowUsageThreshold()))));
+
+ config.setPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold(
+ Double.parseDouble(
+ properties.getProperty(
+ "pipe_threshold_allocation_strategy_high_usage_threshold",
+ String.valueOf(
+
config.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold()))));
}
public static void loadPipeExternalConfig(