This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 1cbef9c67bb Pipe: Optimize Batch and WAL memory allocation algorithms 
(#15534) (#15609)
1cbef9c67bb is described below

commit 1cbef9c67bb66ffc60480c3847a3d84a34fe2ebe
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu May 29 18:24:59 2025 +0800

    Pipe: Optimize Batch and WAL memory allocation algorithms (#15534) (#15609)
---
 .../evolvable/batch/PipeTabletEventBatch.java      |  41 +++++-
 .../evolvable/batch/PipeTabletEventPlainBatch.java |  41 +-----
 .../batch/PipeTabletEventTsFileBatch.java          |  10 +-
 .../resource/memory/PipeDynamicMemoryBlock.java    | 156 +++++++++++++++++++++
 .../pipe/resource/memory/PipeMemoryBlockType.java  |   2 +
 .../db/pipe/resource/memory/PipeMemoryManager.java |  56 ++++++++
 .../resource/memory/PipeModelFixedMemoryBlock.java | 125 +++++++++++++++++
 .../DynamicMemoryAllocationStrategy.java}          |  17 ++-
 .../strategy/ThresholdAllocationStrategy.java      | 134 ++++++++++++++++++
 .../dataregion/wal/utils/WALInsertNodeCache.java   |  96 +++++++------
 .../apache/iotdb/commons/conf/CommonConfig.java    | 115 ++++++++++++++-
 .../iotdb/commons/pipe/config/PipeConfig.java      |  41 ++++++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  42 ++++++
 13 files changed, 775 insertions(+), 101 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index bd07d0c34ad..9bff91c50ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -21,6 +21,10 @@ package 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
+import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -36,6 +40,11 @@ import java.util.Objects;
 public abstract class PipeTabletEventBatch implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventBatch.class);
+  private static final PipeModelFixedMemoryBlock PIPE_MODEL_FIXED_MEMORY_BLOCK 
=
+      PipeDataNodeResourceManager.memory()
+          .forceAllocateForModelFixedMemoryBlock(
+              
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
+              PipeMemoryBlockType.BATCH);
 
   protected final List<EnrichedEvent> events = new ArrayList<>();
 
@@ -43,11 +52,25 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
   private long firstEventProcessingTime = Long.MIN_VALUE;
 
   protected long totalBufferSize = 0;
+  private final PipeDynamicMemoryBlock allocatedMemoryBlock;
 
   protected volatile boolean isClosed = false;
 
-  protected PipeTabletEventBatch(final int maxDelayInMs) {
+  protected PipeTabletEventBatch(final int maxDelayInMs, final long 
requestMaxBatchSizeInBytes) {
     this.maxDelayInMs = maxDelayInMs;
+
+    // limit in buffer size
+    this.allocatedMemoryBlock =
+        
PIPE_MODEL_FIXED_MEMORY_BLOCK.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
+    allocatedMemoryBlock.setExpandable(false);
+
+    if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
+      LOGGER.info(
+          "PipeTabletEventBatch: the max batch size is adjusted from {} to {} 
due to the "
+              + "memory restriction",
+          requestMaxBatchSizeInBytes,
+          getMaxBatchSizeInBytes());
+    }
   }
 
   /**
@@ -104,11 +127,17 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
       throws WALPipeException, IOException;
 
   public boolean shouldEmit() {
-    return totalBufferSize >= getMaxBatchSizeInBytes()
-        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
+    final long diff = System.currentTimeMillis() - firstEventProcessingTime;
+    if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
+      allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) 
diff / maxDelayInMs);
+      return true;
+    }
+    return false;
   }
 
-  protected abstract long getMaxBatchSizeInBytes();
+  private long getMaxBatchSizeInBytes() {
+    return allocatedMemoryBlock.getMemoryUsageInBytes();
+  }
 
   public synchronized void onSuccess() {
     events.clear();
@@ -124,6 +153,10 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
 
     clearEventsReferenceCount(PipeTabletEventBatch.class.getName());
     events.clear();
+
+    if (allocatedMemoryBlock != null) {
+      allocatedMemoryBlock.close();
+    }
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index ea25b325562..292fc018ed7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -23,8 +23,6 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -52,36 +50,11 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
   private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
 
-  // limit in buffer size
-  private final PipeMemoryBlock allocatedMemoryBlock;
-
   // Used to rate limit when transferring data
   private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new 
HashMap<>();
 
   PipeTabletEventPlainBatch(final int maxDelayInMs, final long 
requestMaxBatchSizeInBytes) {
-    super(maxDelayInMs);
-    this.allocatedMemoryBlock =
-        PipeDataNodeResourceManager.memory()
-            .tryAllocate(requestMaxBatchSizeInBytes)
-            .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
-            .setShrinkCallback(
-                (oldMemory, newMemory) ->
-                    LOGGER.info(
-                        "The batch size limit has shrunk from {} to {}.", 
oldMemory, newMemory))
-            .setExpandMethod(
-                oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, 
requestMaxBatchSizeInBytes))
-            .setExpandCallback(
-                (oldMemory, newMemory) ->
-                    LOGGER.info(
-                        "The batch size limit has expanded from {} to {}.", 
oldMemory, newMemory));
-
-    if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
-      LOGGER.info(
-          "PipeTabletEventBatch: the max batch size is adjusted from {} to {} 
due to the "
-              + "memory restriction",
-          requestMaxBatchSizeInBytes,
-          getMaxBatchSizeInBytes());
-    }
+    super(maxDelayInMs, requestMaxBatchSizeInBytes);
   }
 
   @Override
@@ -113,11 +86,6 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
         binaryBuffers, insertNodeBuffers, tabletBuffers);
   }
 
-  @Override
-  protected long getMaxBatchSizeInBytes() {
-    return allocatedMemoryBlock.getMemoryUsageInBytes();
-  }
-
   public Map<Pair<String, Long>, Long> deepCopyPipeName2BytesAccumulated() {
     return new HashMap<>(pipe2BytesAccumulated);
   }
@@ -156,11 +124,4 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
     }
     return buffer.limit();
   }
-
-  @Override
-  public synchronized void close() {
-    super.close();
-
-    allocatedMemoryBlock.close();
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index f2fad1cb0cd..d839f422f03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -86,8 +86,6 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
   private static final String TS_FILE_PREFIX = "tb"; // tb means tablet batch
   private final AtomicLong tsFileIdGenerator = new AtomicLong(0);
 
-  private final long maxSizeInBytes;
-
   private final Map<Pair<String, Long>, Double> pipeName2WeightMap = new 
HashMap<>();
 
   private final List<Tablet> tabletList = new ArrayList<>();
@@ -97,9 +95,8 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
   private volatile TsFileWriter fileWriter;
 
   public PipeTabletEventTsFileBatch(final int maxDelayInMs, final long 
requestMaxBatchSizeInBytes) {
-    super(maxDelayInMs);
+    super(maxDelayInMs, requestMaxBatchSizeInBytes);
 
-    this.maxSizeInBytes = requestMaxBatchSizeInBytes;
     try {
       this.batchFileBaseDir = getNextBaseDir();
     } catch (final Exception e) {
@@ -455,11 +452,6 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
     }
   }
 
-  @Override
-  protected long getMaxBatchSizeInBytes() {
-    return maxSizeInBytes;
-  }
-
   @Override
   public synchronized void onSuccess() {
     super.onSuccess();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
new file mode 100644
index 00000000000..4e33b871828
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory;
+
+import org.apache.tsfile.utils.Pair;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+public class PipeDynamicMemoryBlock {
+
+  private final PipeModelFixedMemoryBlock fixedMemoryBlock;
+
+  private boolean isExpandable = true;
+
+  private Consumer<PipeDynamicMemoryBlock> expand = null;
+
+  private volatile boolean released = false;
+
+  private volatile long memoryUsageInBytes;
+
+  private volatile double historyMemoryEfficiency;
+
+  private volatile double currentMemoryEfficiency;
+
+  PipeDynamicMemoryBlock(
+      final @NotNull PipeModelFixedMemoryBlock fixedMemoryBlock, final long 
memoryUsageInBytes) {
+    this.memoryUsageInBytes = Math.min(memoryUsageInBytes, 0);
+    this.fixedMemoryBlock = fixedMemoryBlock;
+  }
+
+  public long getMemoryUsageInBytes() {
+    return memoryUsageInBytes;
+  }
+
+  public void setMemoryUsageInBytes(final long memoryUsageInBytes) {
+    this.memoryUsageInBytes = memoryUsageInBytes;
+  }
+
+  public Pair<Double, Double> getMemoryEfficiency() {
+    synchronized (fixedMemoryBlock) {
+      return new Pair<>(historyMemoryEfficiency, currentMemoryEfficiency);
+    }
+  }
+
+  public void setExpandable(boolean expandable) {
+    isExpandable = expandable;
+  }
+
+  public void setExpand(Consumer<PipeDynamicMemoryBlock> expand) {
+    this.expand = expand;
+  }
+
+  public double getMemoryBlockUsageRatio() {
+    return (double) memoryUsageInBytes / 
fixedMemoryBlock.getMemoryUsageInBytes();
+  }
+
+  public double getFixedMemoryBlockUsageRatio() {
+    return (double) fixedMemoryBlock.getMemoryAllocatedInBytes()
+        / fixedMemoryBlock.getMemoryUsageInBytes();
+  }
+
+  public long canAllocateMemorySize() {
+    return fixedMemoryBlock.getMemoryUsageInBytes() - 
fixedMemoryBlock.getMemoryAllocatedInBytes();
+  }
+
+  public synchronized long getExpectedAverageAllocatedMemorySize() {
+    return fixedMemoryBlock.getMemoryUsageInBytes() / 
fixedMemoryBlock.getMemoryBlocks().size();
+  }
+
+  public void updateCurrentMemoryEfficiencyAdjustMem(double 
currentMemoryEfficiency) {
+    synchronized (fixedMemoryBlock) {
+      this.historyMemoryEfficiency = this.currentMemoryEfficiency;
+      if (Double.isNaN(currentMemoryEfficiency)
+          || Double.isInfinite(currentMemoryEfficiency)
+          || currentMemoryEfficiency < 0.0) {
+        currentMemoryEfficiency = 0.0;
+      }
+      this.currentMemoryEfficiency = Math.min(currentMemoryEfficiency, 1.0);
+      fixedMemoryBlock.dynamicallyAdjustMemory(this);
+    }
+  }
+
+  public long getFixedMemoryCapacity() {
+    return fixedMemoryBlock.getMemoryUsageInBytes();
+  }
+
+  public void updateMemoryEfficiency(
+      double currentMemoryEfficiency, double historyMemoryEfficiency) {
+    synchronized (fixedMemoryBlock) {
+      if (Double.isNaN(currentMemoryEfficiency)
+          || Double.isInfinite(currentMemoryEfficiency)
+          || currentMemoryEfficiency < 0.0) {
+        currentMemoryEfficiency = 0.0;
+      }
+
+      if (Double.isNaN(historyMemoryEfficiency)
+          || Double.isInfinite(historyMemoryEfficiency)
+          || historyMemoryEfficiency < 0.0) {
+        currentMemoryEfficiency = 0.0;
+      }
+
+      this.historyMemoryEfficiency = Math.min(historyMemoryEfficiency, 1.0);
+      this.currentMemoryEfficiency = Math.min(currentMemoryEfficiency, 1.0);
+    }
+  }
+
+  public Stream<PipeDynamicMemoryBlock> getMemoryBlocks() {
+    return fixedMemoryBlock.getMemoryBlocksStream();
+  }
+
+  public void applyForDynamicMemory(final long memoryUsageInBytes) {
+    fixedMemoryBlock.resetMemoryBlockSize(this, memoryUsageInBytes);
+  }
+
+  public boolean isReleased() {
+    return released;
+  }
+
+  public void close() {
+    if (released) {
+      return;
+    }
+    synchronized (fixedMemoryBlock) {
+      if (!released) {
+        fixedMemoryBlock.releaseMemory(this);
+        released = true;
+      }
+    }
+  }
+
+  void doExpand() {
+    if (isExpandable && expand != null) {
+      expand.accept(this);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
index 5b626df04c3..846fc7dd1ce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
@@ -23,4 +23,6 @@ public enum PipeMemoryBlockType {
   NORMAL,
   TABLET,
   TS_FILE,
+  BATCH,
+  WAL
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index dbade8bea9c..34934bef812 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalExc
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import 
org.apache.iotdb.db.pipe.resource.memory.strategy.ThresholdAllocationStrategy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,6 +104,18 @@ public class PipeMemoryManager {
         * getTotalNonFloatingMemorySizeInBytes();
   }
 
+  public long getAllocatedMemorySizeInBytesOfWAL() {
+    return (long)
+        (PipeConfig.getInstance().getPipeDataStructureWalMemoryProportion()
+            * getTotalNonFloatingMemorySizeInBytes());
+  }
+
+  public long getAllocatedMemorySizeInBytesOfBatch() {
+    return (long)
+        (PipeConfig.getInstance().getPipeDataStructureBatchMemoryProportion()
+            * getTotalNonFloatingMemorySizeInBytes());
+  }
+
   public boolean isEnough4TabletParsing() {
     return (double) usedMemorySizeInBytesOfTablets + (double) 
usedMemorySizeInBytesOfTsFiles
             < EXCEED_PROTECT_THRESHOLD * 
allowedMaxMemorySizeInBytesOfTabletsAndTsFiles()
@@ -232,6 +245,41 @@ public class PipeMemoryManager {
     }
   }
 
+  public PipeModelFixedMemoryBlock forceAllocateForModelFixedMemoryBlock(
+      long fixedSizeInBytes, PipeMemoryBlockType type)
+      throws PipeRuntimeOutOfMemoryCriticalException {
+    if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
+      return new PipeModelFixedMemoryBlock(Long.MAX_VALUE, new 
ThresholdAllocationStrategy());
+    }
+
+    if (fixedSizeInBytes == 0) {
+      return (PipeModelFixedMemoryBlock) registerMemoryBlock(0, type);
+    }
+
+    for (int i = 1, size = 
PipeConfig.getInstance().getPipeMemoryAllocateMaxRetries();
+        i <= size;
+        i++) {
+      if (getFreeMemorySizeInBytes() >= fixedSizeInBytes) {
+        break;
+      }
+
+      try {
+        
Thread.sleep(PipeConfig.getInstance().getPipeMemoryAllocateRetryIntervalInMs());
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("forceAllocateWithRetry: interrupted while waiting for 
available memory", ex);
+      }
+    }
+
+    if (getFreeMemorySizeInBytes() < fixedSizeInBytes) {
+      return (PipeModelFixedMemoryBlock) 
forceAllocateWithRetry(getFreeMemorySizeInBytes(), type);
+    }
+
+    synchronized (this) {
+      return (PipeModelFixedMemoryBlock) 
forceAllocateWithRetry(fixedSizeInBytes, type);
+    }
+  }
+
   private PipeMemoryBlock forceAllocateWithRetry(long sizeInBytes, 
PipeMemoryBlockType type)
       throws PipeRuntimeOutOfMemoryCriticalException {
     if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
@@ -240,6 +288,9 @@ public class PipeMemoryManager {
           return new PipeTabletMemoryBlock(sizeInBytes);
         case TS_FILE:
           return new PipeTsFileMemoryBlock(sizeInBytes);
+        case BATCH:
+        case WAL:
+          return new PipeModelFixedMemoryBlock(sizeInBytes, new 
ThresholdAllocationStrategy());
         default:
           return new PipeMemoryBlock(sizeInBytes);
       }
@@ -467,6 +518,11 @@ public class PipeMemoryManager {
       case TS_FILE:
         returnedMemoryBlock = new PipeTsFileMemoryBlock(sizeInBytes);
         break;
+      case BATCH:
+      case WAL:
+        returnedMemoryBlock =
+            new PipeModelFixedMemoryBlock(sizeInBytes, new 
ThresholdAllocationStrategy());
+        break;
       default:
         returnedMemoryBlock = new PipeMemoryBlock(sizeInBytes);
         break;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeModelFixedMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeModelFixedMemoryBlock.java
new file mode 100644
index 00000000000..647fb81a4b9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeModelFixedMemoryBlock.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory;
+
+import 
org.apache.iotdb.db.pipe.resource.memory.strategy.DynamicMemoryAllocationStrategy;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+public class PipeModelFixedMemoryBlock extends PipeFixedMemoryBlock {
+
+  private final Set<PipeDynamicMemoryBlock> memoryBlocks =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+  private final DynamicMemoryAllocationStrategy allocationStrategy;
+
+  private volatile long memoryAllocatedInBytes;
+
+  public PipeModelFixedMemoryBlock(
+      final long memoryUsageInBytes, final DynamicMemoryAllocationStrategy 
allocationStrategy) {
+    super(memoryUsageInBytes);
+    this.memoryAllocatedInBytes = 0;
+    this.allocationStrategy = allocationStrategy;
+  }
+
+  public synchronized PipeDynamicMemoryBlock registerPipeBatchMemoryBlock(
+      final long memorySizeInBytes) {
+    final PipeDynamicMemoryBlock memoryBlock = new 
PipeDynamicMemoryBlock(this, 0);
+    memoryBlocks.add(memoryBlock);
+    if (memorySizeInBytes != 0) {
+      resetMemoryBlockSize(memoryBlock, memorySizeInBytes);
+      double e = (double) getMemoryUsageInBytes() / memorySizeInBytes;
+      memoryBlock.updateMemoryEfficiency(e, e);
+      return memoryBlock;
+    }
+
+    memoryBlock.updateMemoryEfficiency(0.0, 0.0);
+    return memoryBlock;
+  }
+
+  @Override
+  public synchronized boolean expand() {
+    // Ensure that the memory block that gets most of the memory is released 
first, which can reduce
+    // the jitter of memory allocationIf the memory block is not expanded, it 
will not be expanded
+    // again.This function not only completes the expansion but also the 
reduction.
+    memoryBlocks.stream()
+        .sorted((a, b) -> Long.compare(b.getMemoryUsageInBytes(), 
a.getMemoryUsageInBytes()))
+        .forEach(PipeDynamicMemoryBlock::doExpand);
+    return false;
+  }
+
+  public long getMemoryAllocatedInBytes() {
+    return memoryAllocatedInBytes;
+  }
+
+  public synchronized Set<PipeDynamicMemoryBlock> getMemoryBlocks() {
+    return memoryBlocks;
+  }
+
+  synchronized void releaseMemory(final PipeDynamicMemoryBlock memoryBlock) {
+    resetMemoryBlockSize(memoryBlock, 0);
+    memoryBlocks.remove(memoryBlock);
+  }
+
+  synchronized void dynamicallyAdjustMemory(final PipeDynamicMemoryBlock 
block) {
+    if (this.isReleased() || block.isReleased() || 
!memoryBlocks.contains(block)) {
+      throw new IllegalStateException("The memory block has been released");
+    }
+    allocationStrategy.dynamicallyAdjustMemory(block);
+  }
+
+  synchronized void resetMemoryBlockSize(
+      final PipeDynamicMemoryBlock block, final long memorySizeInBytes) {
+    if (this.isReleased() || block.isReleased() || 
!memoryBlocks.contains(block)) {
+      throw new IllegalStateException("The memory block has been released");
+    }
+
+    final long diff = memorySizeInBytes - block.getMemoryUsageInBytes();
+
+    // If the capacity is expanded, determine whether it will exceed the 
maximum value of the fixed
+    // module
+    if (getMemoryUsageInBytes() - memoryAllocatedInBytes < diff) {
+      // Pay attention to the order of calls, otherwise it will cause resource 
leakage
+      block.setMemoryUsageInBytes(
+          block.getMemoryUsageInBytes() + getMemoryUsageInBytes() - 
memoryAllocatedInBytes);
+      memoryAllocatedInBytes = getMemoryUsageInBytes();
+      return;
+    }
+
+    memoryAllocatedInBytes = memoryAllocatedInBytes + diff;
+    block.setMemoryUsageInBytes(memorySizeInBytes);
+  }
+
+  Stream<PipeDynamicMemoryBlock> getMemoryBlocksStream() {
+    if (isReleased()) {
+      throw new IllegalStateException("The memory block has been released");
+    }
+    return memoryBlocks.stream();
+  }
+
+  @Override
+  public synchronized void close() {
+    memoryBlocks.forEach(PipeDynamicMemoryBlock::close);
+    super.close();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/DynamicMemoryAllocationStrategy.java
similarity index 50%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/DynamicMemoryAllocationStrategy.java
index 5b626df04c3..8e5ba9af053 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/DynamicMemoryAllocationStrategy.java
@@ -17,10 +17,17 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.resource.memory;
+package org.apache.iotdb.db.pipe.resource.memory.strategy;
 
-public enum PipeMemoryBlockType {
-  NORMAL,
-  TABLET,
-  TS_FILE,
+import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
+
+// Now let's define the operation memory behavior: Producers produce memory, 
consumers consume
+// memory, and in order to ensure that consumers do not encounter back 
pressure, the memory that
+// consumers need to use is allocated in advance. Consumer instances obtain 
their expected memory
+// through allocation strategies, and the total memory of all consumer 
instances must not be greater
+// than the pre-allocated memory. The memory allocation algorithm is to adjust 
the memory of
+// consumers so that the consumption rate can reach the optimal
+public interface DynamicMemoryAllocationStrategy {
+
+  void dynamicallyAdjustMemory(PipeDynamicMemoryBlock dynamicMemoryBlock);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/ThresholdAllocationStrategy.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/ThresholdAllocationStrategy.java
new file mode 100644
index 00000000000..72a390f799e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/ThresholdAllocationStrategy.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory.strategy;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+// The algorithm is optimized for different scenarios: The following describes 
the behavior of the
+// algorithm in different scenarios:
+// 1. When the memory is large enough, it will try to allocate memory to the 
memory block
+// 2. When the memory is insufficient, the algorithm will try to ensure that 
the memory with a
+// relatively large memory share is released
+public class ThresholdAllocationStrategy implements 
DynamicMemoryAllocationStrategy {
+
+  private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
+
+  @Override
+  public void dynamicallyAdjustMemory(final PipeDynamicMemoryBlock 
dynamicMemoryBlock) {
+    final double deficitRatio = calculateDeficitRatio(dynamicMemoryBlock);
+    final long oldMemoryUsageInBytes = 
dynamicMemoryBlock.getMemoryUsageInBytes();
+    final long expectedMemory = (long) (oldMemoryUsageInBytes / deficitRatio);
+    final double memoryBlockUsageRatio = 
dynamicMemoryBlock.getMemoryBlockUsageRatio();
+    final long maximumMemoryIncrease =
+        (long)
+            (dynamicMemoryBlock.getFixedMemoryCapacity()
+                * 
PIPE_CONFIG.getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio());
+
+    // Avoid overflow and infinite values
+    if (deficitRatio <= 0.0 || oldMemoryUsageInBytes == 0 || expectedMemory == 
0) {
+      dynamicMemoryBlock.applyForDynamicMemory(maximumMemoryIncrease);
+      final double efficiencyRatio =
+          (double) dynamicMemoryBlock.getMemoryUsageInBytes() / 
maximumMemoryIncrease;
+      dynamicMemoryBlock.updateMemoryEfficiency(efficiencyRatio, 
efficiencyRatio);
+      return;
+    }
+
+    // No matter what, give priority to applying for memory use, and adjust 
the memory size when the
+    // memory is insufficient
+    final double lowUsageThreshold =
+        PIPE_CONFIG.getPipeThresholdAllocationStrategyLowUsageThreshold();
+    if (dynamicMemoryBlock.getFixedMemoryBlockUsageRatio()
+        < 
PIPE_CONFIG.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold()) {
+      if (deficitRatio >= 1.0) {
+        return;
+      }
+
+      final long maxAvailableMemory =
+          Math.min(expectedMemory, dynamicMemoryBlock.canAllocateMemorySize());
+      long newMemoryRequest;
+
+      // Need to ensure that you get memory in smaller chunks and get more 
memory faster
+      if (memoryBlockUsageRatio > lowUsageThreshold) {
+        newMemoryRequest =
+            Math.min(oldMemoryUsageInBytes + oldMemoryUsageInBytes / 2, 
maxAvailableMemory);
+      } else {
+        newMemoryRequest = Math.min(oldMemoryUsageInBytes * 2, 
maxAvailableMemory);
+      }
+
+      dynamicMemoryBlock.applyForDynamicMemory(newMemoryRequest);
+      final double efficiencyRatio =
+          dynamicMemoryBlock.getMemoryUsageInBytes() / (double) expectedMemory;
+      dynamicMemoryBlock.updateMemoryEfficiency(efficiencyRatio, 
efficiencyRatio);
+      return;
+    }
+
+    // Entering this logic means that the memory is insufficient and the 
memory allocation needs to
+    // be adjusted
+    final AtomicBoolean isMemoryNotEnough = new AtomicBoolean(false);
+    final double averageDeficitRatio =
+        dynamicMemoryBlock
+            .getMemoryBlocks()
+            .mapToDouble(
+                block -> {
+                  double ratio = calculateDeficitRatio(block);
+                  if (block.getMemoryUsageInBytes() == 0 || ratio == 0.0) {
+                    isMemoryNotEnough.set(true);
+                  }
+                  return ratio;
+                })
+            .average()
+            .orElse(1.0);
+
+    final double adjustmentThreshold = 
PIPE_CONFIG.getPipeDynamicMemoryAdjustmentThreshold();
+    // When memory is insufficient, try to ensure that smaller memory blocks 
apply for less memory,
+    // and larger memory blocks release more memory.
+    final double diff =
+        isMemoryNotEnough.get() && averageDeficitRatio > 2 * 
adjustmentThreshold
+            ? averageDeficitRatio - deficitRatio - adjustmentThreshold
+            : averageDeficitRatio - deficitRatio;
+
+    if (Math.abs(diff) > 
PIPE_CONFIG.getPipeDynamicMemoryAdjustmentThreshold()) {
+      final long mem = (long) ((dynamicMemoryBlock.getMemoryUsageInBytes() / 
deficitRatio) * diff);
+      
dynamicMemoryBlock.applyForDynamicMemory(dynamicMemoryBlock.getMemoryUsageInBytes()
 + mem);
+      if (oldMemoryUsageInBytes != dynamicMemoryBlock.getMemoryUsageInBytes()) 
{
+        final double efficiencyRatio =
+            dynamicMemoryBlock.getMemoryUsageInBytes() / (double) 
expectedMemory;
+        dynamicMemoryBlock.updateMemoryEfficiency(efficiencyRatio, 
efficiencyRatio);
+      }
+    } else if (memoryBlockUsageRatio > lowUsageThreshold
+        && memoryBlockUsageRatio > 
dynamicMemoryBlock.getExpectedAverageAllocatedMemorySize()) {
+      // If there is insufficient memory, some memory must be released
+      dynamicMemoryBlock.applyForDynamicMemory(oldMemoryUsageInBytes / 2);
+      dynamicMemoryBlock.updateMemoryEfficiency(deficitRatio / 2, deficitRatio 
/ 2);
+    }
+  }
+
+  private double calculateDeficitRatio(final PipeDynamicMemoryBlock block) {
+    final Pair<Double, Double> memoryEfficiency = block.getMemoryEfficiency();
+    double pipeDynamicMemoryHistoryWeight = 
PIPE_CONFIG.getPipeDynamicMemoryHistoryWeight();
+    return (1 - pipeDynamicMemoryHistoryWeight) * memoryEfficiency.getRight()
+        + pipeDynamicMemoryHistoryWeight * memoryEfficiency.getLeft();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 983363689c7..b9419faf695 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
+import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
@@ -61,7 +63,14 @@ public class WALInsertNodeCache {
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
   private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
 
-  private final PipeMemoryBlock allocatedMemoryBlock;
+  private static final PipeModelFixedMemoryBlock WAL_MODEL_FIXED_MEMORY =
+      PipeDataNodeResourceManager.memory()
+          .forceAllocateForModelFixedMemoryBlock(
+              
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(),
+              PipeMemoryBlockType.WAL);
+
+  private final PipeDynamicMemoryBlock memoryBlock;
+
   // Used to adjust the memory usage of the cache
   private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
   private final AtomicBoolean isBatchLoadEnabled = new AtomicBoolean(true);
@@ -82,28 +91,12 @@ public class WALInsertNodeCache {
                     * CONFIG.getWalFileSizeThresholdInByte()
                     / CONFIG.getDataRegionNum(),
                 0.5 * CONFIG.getAllocateMemoryForPipe() / 
CONFIG.getDataRegionNum());
-    allocatedMemoryBlock =
-        PipeDataNodeResourceManager.memory()
-            .tryAllocate(requestedAllocateSize)
-            .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
-            .setExpandMethod(
-                oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, 
requestedAllocateSize))
-            .setExpandCallback(
-                (oldMemory, newMemory) -> {
-                  memoryUsageCheatFactor.updateAndGet(
-                      factor -> factor / ((double) newMemory / oldMemory));
-                  isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
-                  LOGGER.info(
-                      "WALInsertNodeCache.allocatedMemoryBlock of dataRegion 
{} has expanded from {} to {}.",
-                      dataRegionId,
-                      oldMemory,
-                      newMemory);
-                });
+    memoryBlock = 
WAL_MODEL_FIXED_MEMORY.registerPipeBatchMemoryBlock(requestedAllocateSize);
     isBatchLoadEnabled.set(
-        allocatedMemoryBlock.getMemoryUsageInBytes() >= 
CONFIG.getWalFileSizeThresholdInByte());
+        memoryBlock.getMemoryUsageInBytes() >= 
CONFIG.getWalFileSizeThresholdInByte());
     lruCache =
         Caffeine.newBuilder()
-            .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
+            .maximumWeight(requestedAllocateSize)
             .weigher(
                 (Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
                     (position, pair) -> {
@@ -124,30 +117,51 @@ public class WALInsertNodeCache {
                     })
             .recordStats()
             .build(new WALInsertNodeCacheLoader());
-    allocatedMemoryBlock.setShrinkCallback(
-        (oldMemory, newMemory) -> {
-          memoryUsageCheatFactor.updateAndGet(factor -> factor * ((double) 
oldMemory / newMemory));
-          isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
-          LOGGER.info(
-              "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has 
shrunk from {} to {}.",
-              dataRegionId,
-              oldMemory,
-              newMemory);
-          if (CONFIG.getWALCacheShrinkClearEnabled()) {
-            try {
-              lruCache.cleanUp();
-            } catch (Exception e) {
-              LOGGER.warn(
-                  "Failed to clear WALInsertNodeCache for dataRegion ID: {}.", 
dataRegionId, e);
-              return;
-            }
-            LOGGER.info(
-                "Successfully cleared WALInsertNodeCache for dataRegion ID: 
{}.", dataRegionId);
+
+    memoryBlock.setExpandable(true);
+    memoryBlock.setExpand(
+        memoryBlock -> {
+          final long oldMemory = memoryBlock.getMemoryUsageInBytes();
+          
memoryBlock.updateCurrentMemoryEfficiencyAdjustMem(lruCache.stats().hitRate());
+          final long newMemory = memoryBlock.getMemoryUsageInBytes();
+          if (newMemory > oldMemory) {
+            setExpandCallback(oldMemory, newMemory, dataRegionId);
+          } else if (newMemory < oldMemory) {
+            shrinkCallback(oldMemory, newMemory, dataRegionId);
           }
         });
     PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
   }
 
+  private void setExpandCallback(long oldMemory, long newMemory, Integer 
dataRegionId) {
+    memoryUsageCheatFactor.updateAndGet(factor -> factor / ((double) newMemory 
/ oldMemory));
+    isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
+    LOGGER.info(
+        "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has expanded 
from {} to {}.",
+        dataRegionId,
+        oldMemory,
+        newMemory);
+  }
+
+  private void shrinkCallback(long oldMemory, long newMemory, Integer 
dataRegionId) {
+    memoryUsageCheatFactor.updateAndGet(factor -> factor * ((double) oldMemory 
/ newMemory));
+    isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
+    LOGGER.info(
+        "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has shrunk 
from {} to {}.",
+        dataRegionId,
+        oldMemory,
+        newMemory);
+    if (CONFIG.getWALCacheShrinkClearEnabled()) {
+      try {
+        lruCache.cleanUp();
+      } catch (Exception e) {
+        LOGGER.warn("Failed to clear WALInsertNodeCache for dataRegion ID: 
{}.", dataRegionId, e);
+        return;
+      }
+      LOGGER.info("Successfully cleared WALInsertNodeCache for dataRegion ID: 
{}.", dataRegionId);
+    }
+  }
+
   /////////////////////////// Getter & Setter ///////////////////////////
 
   public InsertNode getInsertNode(final WALEntryPosition position) {
@@ -373,7 +387,7 @@ public class WALInsertNodeCache {
   @TestOnly
   public void clear() {
     lruCache.invalidateAll();
-    allocatedMemoryBlock.close();
+    memoryBlock.close();
     memTablesNeedSearch.clear();
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index c3f2e0f4a11..4a1c9ea104c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -216,8 +216,10 @@ public class CommonConfig {
 
   private int pipeDataStructureTabletRowSize = 2048;
   private int pipeDataStructureTabletSizeInBytes = 2097152;
-  private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.4;
-  private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 
0.4;
+  private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.2;
+  private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 
0.2;
+  private double pipeDataStructureWalMemoryProportion = 0.2;
+  private double PipeDataStructureBatchMemoryProportion = 0.2;
   private double pipeTotalFloatingMemoryProportion = 0.2;
 
   private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 
10_000;
@@ -302,6 +304,11 @@ public class CommonConfig {
   private PipeRemainingTimeRateAverageTime 
pipeRemainingTimeCommitRateAverageTime =
       PipeRemainingTimeRateAverageTime.MEAN;
   private double pipeTsFileScanParsingThreshold = 0.05;
+  private double pipeDynamicMemoryHistoryWeight = 0.5;
+  private double pipeDynamicMemoryAdjustmentThreshold = 0.05;
+  private double pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio = 
0.1d;
+  private double pipeThresholdAllocationStrategyLowUsageThreshold = 0.2d;
+  private double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold 
= 0.8d;
 
   private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 
minutes
   private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; 
// 3 minutes
@@ -841,6 +848,34 @@ public class CommonConfig {
         pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold);
   }
 
+  public double getPipeDataStructureWalMemoryProportion() {
+    return pipeDataStructureWalMemoryProportion;
+  }
+
+  public void setPipeDataStructureWalMemoryProportion(double 
pipeDataStructureWalMemoryProportion) {
+    if (this.pipeDataStructureWalMemoryProportion == 
pipeDataStructureWalMemoryProportion) {
+      return;
+    }
+    this.pipeDataStructureWalMemoryProportion = 
pipeDataStructureWalMemoryProportion;
+    logger.info(
+        "pipeDataStructureWalMemoryProportion is set to {}.", 
pipeDataStructureWalMemoryProportion);
+  }
+
+  public double getPipeDataStructureBatchMemoryProportion() {
+    return PipeDataStructureBatchMemoryProportion;
+  }
+
+  public void setPipeDataStructureBatchMemoryProportion(
+      double PipeDataStructureBatchMemoryProportion) {
+    if (this.PipeDataStructureBatchMemoryProportion == 
PipeDataStructureBatchMemoryProportion) {
+      return;
+    }
+    this.PipeDataStructureBatchMemoryProportion = 
PipeDataStructureBatchMemoryProportion;
+    logger.info(
+        "PipeDataStructureBatchMemoryProportion is set to {}.",
+        PipeDataStructureBatchMemoryProportion);
+  }
+
   public double getPipeTotalFloatingMemoryProportion() {
     return pipeTotalFloatingMemoryProportion;
   }
@@ -1824,6 +1859,82 @@ public class CommonConfig {
     logger.info("pipeTsFileScanParsingThreshold is set to {}", 
pipeTsFileScanParsingThreshold);
   }
 
+  public double getPipeDynamicMemoryHistoryWeight() {
+    return pipeDynamicMemoryHistoryWeight;
+  }
+
+  public void setPipeDynamicMemoryHistoryWeight(double 
pipeDynamicMemoryHistoryWeight) {
+    if (this.pipeDynamicMemoryHistoryWeight == pipeDynamicMemoryHistoryWeight) 
{
+      return;
+    }
+    this.pipeDynamicMemoryHistoryWeight = pipeDynamicMemoryHistoryWeight;
+    logger.info("PipeDynamicMemoryHistoryWeight is set to {}", 
pipeDynamicMemoryHistoryWeight);
+  }
+
+  public double getPipeDynamicMemoryAdjustmentThreshold() {
+    return pipeDynamicMemoryAdjustmentThreshold;
+  }
+
+  public void setPipeDynamicMemoryAdjustmentThreshold(double 
pipeDynamicMemoryAdjustmentThreshold) {
+    if (this.pipeDynamicMemoryAdjustmentThreshold == 
pipeDynamicMemoryAdjustmentThreshold) {
+      return;
+    }
+    this.pipeDynamicMemoryAdjustmentThreshold = 
pipeDynamicMemoryAdjustmentThreshold;
+    logger.info(
+        "pipeDynamicMemoryAdjustmentThreshold is set to {}", 
pipeDynamicMemoryAdjustmentThreshold);
+  }
+
+  public double 
getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio() {
+    return pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio;
+  }
+
+  public void setPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio(
+      double pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio) {
+    if (this.pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio
+        == pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio) {
+      return;
+    }
+    this.pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio =
+        pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio;
+    logger.info(
+        "pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio is set to 
{}",
+        pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio);
+  }
+
+  public double getPipeThresholdAllocationStrategyLowUsageThreshold() {
+    return pipeThresholdAllocationStrategyLowUsageThreshold;
+  }
+
+  public void setPipeThresholdAllocationStrategyLowUsageThreshold(
+      double pipeThresholdAllocationStrategyLowUsageThreshold) {
+    if (this.pipeThresholdAllocationStrategyLowUsageThreshold
+        == pipeThresholdAllocationStrategyLowUsageThreshold) {
+      return;
+    }
+    this.pipeThresholdAllocationStrategyLowUsageThreshold =
+        pipeThresholdAllocationStrategyLowUsageThreshold;
+    logger.info(
+        "pipeMemoryBlockLowUsageThreshold is set to {}",
+        pipeThresholdAllocationStrategyLowUsageThreshold);
+  }
+
+  public double 
getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold() {
+    return pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold;
+  }
+
+  public void setPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold(
+      double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold) {
+    if (this.pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold
+        == pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold) {
+      return;
+    }
+    this.pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold =
+        pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold;
+    logger.info(
+        "pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold is set 
to {}",
+        pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold);
+  }
+
   public double getPipeAllSinksRateLimitBytesPerSecond() {
     return pipeAllSinksRateLimitBytesPerSecond;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index c08fab09aff..63698dc46cd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -79,6 +79,14 @@ public class PipeConfig {
     return 
COMMON_CONFIG.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold();
   }
 
+  public double getPipeDataStructureWalMemoryProportion() {
+    return COMMON_CONFIG.getPipeDataStructureWalMemoryProportion();
+  }
+
+  public double getPipeDataStructureBatchMemoryProportion() {
+    return COMMON_CONFIG.getPipeDataStructureBatchMemoryProportion();
+  }
+
   public double getPipeTotalFloatingMemoryProportion() {
     return COMMON_CONFIG.getPipeTotalFloatingMemoryProportion();
   }
@@ -223,6 +231,26 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeTsFileScanParsingThreshold();
   }
 
+  public double getPipeDynamicMemoryHistoryWeight() {
+    return COMMON_CONFIG.getPipeDynamicMemoryHistoryWeight();
+  }
+
+  public double getPipeDynamicMemoryAdjustmentThreshold() {
+    return COMMON_CONFIG.getPipeDynamicMemoryAdjustmentThreshold();
+  }
+
+  public double 
getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio() {
+    return 
COMMON_CONFIG.getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio();
+  }
+
+  public double getPipeThresholdAllocationStrategyLowUsageThreshold() {
+    return COMMON_CONFIG.getPipeThresholdAllocationStrategyLowUsageThreshold();
+  }
+
+  public double 
getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold() {
+    return 
COMMON_CONFIG.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold();
+  }
+
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
   public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -487,6 +515,19 @@ public class PipeConfig {
         "PipeRemainingTimeCommitRateAverageTime: {}", 
getPipeRemainingTimeCommitRateAverageTime());
     LOGGER.info("PipeTsFileScanParsingThreshold(): {}", 
getPipeTsFileScanParsingThreshold());
 
+    LOGGER.info("PipeDynamicMemoryHistoryWeight: {}", 
getPipeDynamicMemoryHistoryWeight());
+    LOGGER.info(
+        "PipeDynamicMemoryAdjustmentThreshold: {}", 
getPipeDynamicMemoryAdjustmentThreshold());
+    LOGGER.info(
+        "PipeThresholdAllocationStrategyMaximumMemoryIncrementRatio: {}",
+        getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio());
+    LOGGER.info(
+        "PipeThresholdAllocationStrategyLowUsageThreshold: {}",
+        getPipeThresholdAllocationStrategyLowUsageThreshold());
+    LOGGER.info(
+        "PipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold: {}",
+        getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold());
+
     LOGGER.info(
         "PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
         getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index f25c285ae5a..ea2829172ce 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -228,6 +228,16 @@ public class PipeDescriptor {
                 
"pipe_data_structure_ts_file_memory_block_allocation_reject_threshold",
                 String.valueOf(
                     
config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()))));
+    config.setPipeDataStructureWalMemoryProportion(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_data_structure_wal_memory_proportion",
+                
String.valueOf(config.getPipeDataStructureWalMemoryProportion()))));
+    config.setPipeDataStructureBatchMemoryProportion(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_data_structure_batch_memory_proportion",
+                
String.valueOf(config.getPipeDataStructureBatchMemoryProportion()))));
     config.setPipeTotalFloatingMemoryProportion(
         Double.parseDouble(
             properties.getProperty(
@@ -520,6 +530,38 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_tsfile_scan_parsing_threshold",
                 String.valueOf(config.getPipeTsFileScanParsingThreshold()))));
+
+    config.setPipeDynamicMemoryHistoryWeight(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_dynamic_memory_history_weight",
+                String.valueOf(config.getPipeDynamicMemoryHistoryWeight()))));
+
+    config.setPipeDynamicMemoryAdjustmentThreshold(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_dynamic_memory_adjustment_threshold",
+                
String.valueOf(config.getPipeDynamicMemoryAdjustmentThreshold()))));
+
+    config.setPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio(
+        Double.parseDouble(
+            properties.getProperty(
+                
"pipe_threshold_allocation_strategy_maximum_memory_increment_ratio",
+                String.valueOf(
+                    
config.getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio()))));
+
+    config.setPipeThresholdAllocationStrategyLowUsageThreshold(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_threshold_allocation_strategy_low_usage_threshold",
+                
String.valueOf(config.getPipeThresholdAllocationStrategyLowUsageThreshold()))));
+
+    config.setPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_threshold_allocation_strategy_high_usage_threshold",
+                String.valueOf(
+                    
config.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold()))));
   }
 
   public static void loadPipeExternalConfig(

Reply via email to