This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 6b210542416 [To dev/1.3] Pipe: Fix the problem that the initialization 
of Pipe consumer memory module caused DN writing to get stuck (#15645) (#15648)
6b210542416 is described below

commit 6b210542416124205523fcf2a772f2b150ae9ed2
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Jun 5 11:19:37 2025 +0800

    [To dev/1.3] Pipe: Fix the problem that the initialization of Pipe consumer 
memory module caused DN writing to get stuck (#15645) (#15648)
---
 .../db/pipe/agent/runtime/PipeAgentLauncher.java   |  4 ++
 .../evolvable/batch/PipeTabletEventBatch.java      | 33 +++++++++++++---
 .../dataregion/wal/utils/WALInsertNodeCache.java   | 44 ++++++++++++++++++----
 3 files changed, 67 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 1b56a00b325..96e2e4e7299 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -31,10 +31,12 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.db.service.ResourcesInformationHolder;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALInsertNodeCache;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -159,6 +161,8 @@ class PipeAgentLauncher {
     try (final ConfigNodeClient configNodeClient =
         
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
       final TGetAllPipeInfoResp getAllPipeInfoResp = 
configNodeClient.getAllPipeInfo();
+      WALInsertNodeCache.init();
+      PipeTabletEventBatch.init();
       if (getAllPipeInfoResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         throw new StartupException("Failed to get pipe task meta from config 
node.");
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index 9bff91c50ad..91dade4b915 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -40,11 +40,7 @@ import java.util.Objects;
 public abstract class PipeTabletEventBatch implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventBatch.class);
-  private static final PipeModelFixedMemoryBlock PIPE_MODEL_FIXED_MEMORY_BLOCK 
=
-      PipeDataNodeResourceManager.memory()
-          .forceAllocateForModelFixedMemoryBlock(
-              
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
-              PipeMemoryBlockType.BATCH);
+  private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null;
 
   protected final List<EnrichedEvent> events = new ArrayList<>();
 
@@ -57,11 +53,15 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
   protected volatile boolean isClosed = false;
 
   protected PipeTabletEventBatch(final int maxDelayInMs, final long 
requestMaxBatchSizeInBytes) {
+    if (pipeModelFixedMemoryBlock == null) {
+      init();
+    }
+
     this.maxDelayInMs = maxDelayInMs;
 
     // limit in buffer size
     this.allocatedMemoryBlock =
-        
PIPE_MODEL_FIXED_MEMORY_BLOCK.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
+        
pipeModelFixedMemoryBlock.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
     allocatedMemoryBlock.setExpandable(false);
 
     if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
@@ -190,4 +190,25 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
   public boolean isEmpty() {
     return events.isEmpty();
   }
+
+  // please at PipeLauncher call this method to init pipe model fixed memory 
block
+  public static void init() {
+    if (pipeModelFixedMemoryBlock != null) {
+      return;
+    }
+
+    try {
+      pipeModelFixedMemoryBlock =
+          PipeDataNodeResourceManager.memory()
+              .forceAllocateForModelFixedMemoryBlock(
+                  
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
+                  PipeMemoryBlockType.BATCH);
+    } catch (Exception e) {
+      LOGGER.error("init pipe model fixed memory block failed", e);
+      // If the allocation fails, we still need to create a default memory 
block to avoid NPE.
+      pipeModelFixedMemoryBlock =
+          PipeDataNodeResourceManager.memory()
+              .forceAllocateForModelFixedMemoryBlock(0, 
PipeMemoryBlockType.BATCH);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index b9419faf695..b6ce0dc5de5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -63,11 +63,7 @@ public class WALInsertNodeCache {
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
   private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
 
-  private static final PipeModelFixedMemoryBlock WAL_MODEL_FIXED_MEMORY =
-      PipeDataNodeResourceManager.memory()
-          .forceAllocateForModelFixedMemoryBlock(
-              
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(),
-              PipeMemoryBlockType.WAL);
+  private static PipeModelFixedMemoryBlock walModelFixedMemory = null;
 
   private final PipeDynamicMemoryBlock memoryBlock;
 
@@ -83,6 +79,10 @@ public class WALInsertNodeCache {
   private volatile boolean hasPipeRunning = false;
 
   private WALInsertNodeCache(final Integer dataRegionId) {
+    if (walModelFixedMemory == null) {
+      init();
+    }
+
     final long requestedAllocateSize =
         (long)
             Math.min(
@@ -91,7 +91,7 @@ public class WALInsertNodeCache {
                     * CONFIG.getWalFileSizeThresholdInByte()
                     / CONFIG.getDataRegionNum(),
                 0.5 * CONFIG.getAllocateMemoryForPipe() / 
CONFIG.getDataRegionNum());
-    memoryBlock = 
WAL_MODEL_FIXED_MEMORY.registerPipeBatchMemoryBlock(requestedAllocateSize);
+    memoryBlock = 
walModelFixedMemory.registerPipeBatchMemoryBlock(requestedAllocateSize);
     isBatchLoadEnabled.set(
         memoryBlock.getMemoryUsageInBytes() >= 
CONFIG.getWalFileSizeThresholdInByte());
     lruCache =
@@ -134,7 +134,11 @@ public class WALInsertNodeCache {
   }
 
   private void setExpandCallback(long oldMemory, long newMemory, Integer 
dataRegionId) {
-    memoryUsageCheatFactor.updateAndGet(factor -> factor / ((double) newMemory 
/ oldMemory));
+    memoryUsageCheatFactor.updateAndGet(
+        factor ->
+            factor == 0L || newMemory == 0L || oldMemory == 0
+                ? 0.0
+                : factor / ((double) newMemory / oldMemory));
     isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
     LOGGER.info(
         "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has expanded 
from {} to {}.",
@@ -144,7 +148,11 @@ public class WALInsertNodeCache {
   }
 
   private void shrinkCallback(long oldMemory, long newMemory, Integer 
dataRegionId) {
-    memoryUsageCheatFactor.updateAndGet(factor -> factor * ((double) oldMemory 
/ newMemory));
+    memoryUsageCheatFactor.updateAndGet(
+        factor ->
+            factor == 0L || newMemory == 0L || oldMemory == 0
+                ? 0.0
+                : factor * ((double) oldMemory / newMemory));
     isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
     LOGGER.info(
         "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has shrunk 
from {} to {}.",
@@ -162,6 +170,26 @@ public class WALInsertNodeCache {
     }
   }
 
+  // please call this method at PipeLauncher
+  public static void init() {
+    if (walModelFixedMemory != null) {
+      return;
+    }
+    try {
+      // Allocate memory for the fixed memory block of WAL
+      walModelFixedMemory =
+          PipeDataNodeResourceManager.memory()
+              .forceAllocateForModelFixedMemoryBlock(
+                  
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(),
+                  PipeMemoryBlockType.WAL);
+    } catch (Exception e) {
+      LOGGER.error("Failed to initialize WAL model fixed memory block", e);
+      walModelFixedMemory =
+          PipeDataNodeResourceManager.memory()
+              .forceAllocateForModelFixedMemoryBlock(0, 
PipeMemoryBlockType.WAL);
+    }
+  }
+
   /////////////////////////// Getter & Setter ///////////////////////////
 
   public InsertNode getInsertNode(final WALEntryPosition position) {

Reply via email to