This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7a8de6dd4ab Pipe: Fix the problem that the initialization of Pipe
consumer memory module caused DN writing to get stuck (#15645)
7a8de6dd4ab is described below
commit 7a8de6dd4ab7ec503e884128fadd1d7bdc6808e5
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Jun 5 10:43:18 2025 +0800
Pipe: Fix the problem that the initialization of Pipe consumer memory
module caused DN writing to get stuck (#15645)
* Pipe: Fix the problem that the initialization of Pipe consumer memory
module caused DN writing to get stuck
* fix
* fix
* fix
---
.../db/pipe/agent/runtime/PipeAgentLauncher.java | 4 ++
.../evolvable/batch/PipeTabletEventBatch.java | 33 +++++++++++++---
.../dataregion/wal/utils/WALInsertNodeCache.java | 44 ++++++++++++++++++----
3 files changed, 67 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 1b56a00b325..96e2e4e7299 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -31,10 +31,12 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.service.ResourcesInformationHolder;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALInsertNodeCache;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -159,6 +161,8 @@ class PipeAgentLauncher {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
final TGetAllPipeInfoResp getAllPipeInfoResp =
configNodeClient.getAllPipeInfo();
+ WALInsertNodeCache.init();
+ PipeTabletEventBatch.init();
if (getAllPipeInfoResp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new StartupException("Failed to get pipe task meta from config
node.");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index 9bff91c50ad..91dade4b915 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -40,11 +40,7 @@ import java.util.Objects;
public abstract class PipeTabletEventBatch implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletEventBatch.class);
- private static final PipeModelFixedMemoryBlock PIPE_MODEL_FIXED_MEMORY_BLOCK
=
- PipeDataNodeResourceManager.memory()
- .forceAllocateForModelFixedMemoryBlock(
-
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
- PipeMemoryBlockType.BATCH);
+ private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null;
protected final List<EnrichedEvent> events = new ArrayList<>();
@@ -57,11 +53,15 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
protected volatile boolean isClosed = false;
protected PipeTabletEventBatch(final int maxDelayInMs, final long
requestMaxBatchSizeInBytes) {
+ if (pipeModelFixedMemoryBlock == null) {
+ init();
+ }
+
this.maxDelayInMs = maxDelayInMs;
// limit in buffer size
this.allocatedMemoryBlock =
-
PIPE_MODEL_FIXED_MEMORY_BLOCK.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
+
pipeModelFixedMemoryBlock.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
allocatedMemoryBlock.setExpandable(false);
if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
@@ -190,4 +190,25 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
public boolean isEmpty() {
return events.isEmpty();
}
+
+ // please at PipeLauncher call this method to init pipe model fixed memory
block
+ public static void init() {
+ if (pipeModelFixedMemoryBlock != null) {
+ return;
+ }
+
+ try {
+ pipeModelFixedMemoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(
+
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
+ PipeMemoryBlockType.BATCH);
+ } catch (Exception e) {
+ LOGGER.error("init pipe model fixed memory block failed", e);
+ // If the allocation fails, we still need to create a default memory
block to avoid NPE.
+ pipeModelFixedMemoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(0,
PipeMemoryBlockType.BATCH);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 20469f2b798..7ca049698ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -66,11 +66,7 @@ public class WALInsertNodeCache {
IoTDBDescriptor.getInstance().getMemoryConfig();
private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
- private static final PipeModelFixedMemoryBlock WAL_MODEL_FIXED_MEMORY =
- PipeDataNodeResourceManager.memory()
- .forceAllocateForModelFixedMemoryBlock(
-
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(),
- PipeMemoryBlockType.WAL);
+ private static PipeModelFixedMemoryBlock walModelFixedMemory = null;
private final PipeDynamicMemoryBlock memoryBlock;
@@ -86,6 +82,10 @@ public class WALInsertNodeCache {
private volatile boolean hasPipeRunning = false;
private WALInsertNodeCache(final Integer dataRegionId) {
+ if (walModelFixedMemory == null) {
+ init();
+ }
+
final long requestedAllocateSize =
(long)
Math.min(
@@ -96,7 +96,7 @@ public class WALInsertNodeCache {
0.5
*
MEMORY_CONFIG.getPipeMemoryManager().getTotalMemorySizeInBytes()
/ CONFIG.getDataRegionNum());
- memoryBlock =
WAL_MODEL_FIXED_MEMORY.registerPipeBatchMemoryBlock(requestedAllocateSize);
+ memoryBlock =
walModelFixedMemory.registerPipeBatchMemoryBlock(requestedAllocateSize);
isBatchLoadEnabled.set(
memoryBlock.getMemoryUsageInBytes() >=
CONFIG.getWalFileSizeThresholdInByte());
lruCache =
@@ -139,7 +139,11 @@ public class WALInsertNodeCache {
}
private void setExpandCallback(long oldMemory, long newMemory, Integer
dataRegionId) {
- memoryUsageCheatFactor.updateAndGet(factor -> factor / ((double) newMemory
/ oldMemory));
+ memoryUsageCheatFactor.updateAndGet(
+ factor ->
+ factor == 0L || newMemory == 0L || oldMemory == 0
+ ? 0.0
+ : factor / ((double) newMemory / oldMemory));
isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
LOGGER.info(
"WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has expanded
from {} to {}.",
@@ -149,7 +153,11 @@ public class WALInsertNodeCache {
}
private void shrinkCallback(long oldMemory, long newMemory, Integer
dataRegionId) {
- memoryUsageCheatFactor.updateAndGet(factor -> factor * ((double) oldMemory
/ newMemory));
+ memoryUsageCheatFactor.updateAndGet(
+ factor ->
+ factor == 0L || newMemory == 0L || oldMemory == 0
+ ? 0.0
+ : factor * ((double) oldMemory / newMemory));
isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
LOGGER.info(
"WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has shrunk
from {} to {}.",
@@ -167,6 +175,26 @@ public class WALInsertNodeCache {
}
}
+ // please call this method at PipeLauncher
+ public static void init() {
+ if (walModelFixedMemory != null) {
+ return;
+ }
+ try {
+ // Allocate memory for the fixed memory block of WAL
+ walModelFixedMemory =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(
+
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(),
+ PipeMemoryBlockType.WAL);
+ } catch (Exception e) {
+ LOGGER.error("Failed to initialize WAL model fixed memory block", e);
+ walModelFixedMemory =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(0,
PipeMemoryBlockType.WAL);
+ }
+ }
+
/////////////////////////// Getter & Setter ///////////////////////////
public InsertNode getInsertNode(final WALEntryPosition position) {