This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 6b210542416 [To dev/1.3] Pipe: Fix the problem that the initialization
of Pipe consumer memory module caused DN writing to get stuck (#15645) (#15648)
6b210542416 is described below
commit 6b210542416124205523fcf2a772f2b150ae9ed2
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Jun 5 11:19:37 2025 +0800
[To dev/1.3] Pipe: Fix the problem that the initialization of Pipe consumer
memory module caused DN writing to get stuck (#15645) (#15648)
---
.../db/pipe/agent/runtime/PipeAgentLauncher.java | 4 ++
.../evolvable/batch/PipeTabletEventBatch.java | 33 +++++++++++++---
.../dataregion/wal/utils/WALInsertNodeCache.java | 44 ++++++++++++++++++----
3 files changed, 67 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 1b56a00b325..96e2e4e7299 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -31,10 +31,12 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.service.ResourcesInformationHolder;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALInsertNodeCache;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -159,6 +161,8 @@ class PipeAgentLauncher {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
final TGetAllPipeInfoResp getAllPipeInfoResp =
configNodeClient.getAllPipeInfo();
+ WALInsertNodeCache.init();
+ PipeTabletEventBatch.init();
if (getAllPipeInfoResp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new StartupException("Failed to get pipe task meta from config
node.");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index 9bff91c50ad..91dade4b915 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -40,11 +40,7 @@ import java.util.Objects;
public abstract class PipeTabletEventBatch implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletEventBatch.class);
- private static final PipeModelFixedMemoryBlock PIPE_MODEL_FIXED_MEMORY_BLOCK
=
- PipeDataNodeResourceManager.memory()
- .forceAllocateForModelFixedMemoryBlock(
-
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
- PipeMemoryBlockType.BATCH);
+ private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null;
protected final List<EnrichedEvent> events = new ArrayList<>();
@@ -57,11 +53,15 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
protected volatile boolean isClosed = false;
protected PipeTabletEventBatch(final int maxDelayInMs, final long
requestMaxBatchSizeInBytes) {
+ if (pipeModelFixedMemoryBlock == null) {
+ init();
+ }
+
this.maxDelayInMs = maxDelayInMs;
// limit in buffer size
this.allocatedMemoryBlock =
-
PIPE_MODEL_FIXED_MEMORY_BLOCK.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
+
pipeModelFixedMemoryBlock.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
allocatedMemoryBlock.setExpandable(false);
if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
@@ -190,4 +190,25 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
public boolean isEmpty() {
return events.isEmpty();
}
+
+ // please at PipeLauncher call this method to init pipe model fixed memory
block
+ public static void init() {
+ if (pipeModelFixedMemoryBlock != null) {
+ return;
+ }
+
+ try {
+ pipeModelFixedMemoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(
+
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
+ PipeMemoryBlockType.BATCH);
+ } catch (Exception e) {
+ LOGGER.error("init pipe model fixed memory block failed", e);
+ // If the allocation fails, we still need to create a default memory
block to avoid NPE.
+ pipeModelFixedMemoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(0,
PipeMemoryBlockType.BATCH);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index b9419faf695..b6ce0dc5de5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -63,11 +63,7 @@ public class WALInsertNodeCache {
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
- private static final PipeModelFixedMemoryBlock WAL_MODEL_FIXED_MEMORY =
- PipeDataNodeResourceManager.memory()
- .forceAllocateForModelFixedMemoryBlock(
-
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(),
- PipeMemoryBlockType.WAL);
+ private static PipeModelFixedMemoryBlock walModelFixedMemory = null;
private final PipeDynamicMemoryBlock memoryBlock;
@@ -83,6 +79,10 @@ public class WALInsertNodeCache {
private volatile boolean hasPipeRunning = false;
private WALInsertNodeCache(final Integer dataRegionId) {
+ if (walModelFixedMemory == null) {
+ init();
+ }
+
final long requestedAllocateSize =
(long)
Math.min(
@@ -91,7 +91,7 @@ public class WALInsertNodeCache {
* CONFIG.getWalFileSizeThresholdInByte()
/ CONFIG.getDataRegionNum(),
0.5 * CONFIG.getAllocateMemoryForPipe() /
CONFIG.getDataRegionNum());
- memoryBlock =
WAL_MODEL_FIXED_MEMORY.registerPipeBatchMemoryBlock(requestedAllocateSize);
+ memoryBlock =
walModelFixedMemory.registerPipeBatchMemoryBlock(requestedAllocateSize);
isBatchLoadEnabled.set(
memoryBlock.getMemoryUsageInBytes() >=
CONFIG.getWalFileSizeThresholdInByte());
lruCache =
@@ -134,7 +134,11 @@ public class WALInsertNodeCache {
}
private void setExpandCallback(long oldMemory, long newMemory, Integer
dataRegionId) {
- memoryUsageCheatFactor.updateAndGet(factor -> factor / ((double) newMemory
/ oldMemory));
+ memoryUsageCheatFactor.updateAndGet(
+ factor ->
+ factor == 0L || newMemory == 0L || oldMemory == 0
+ ? 0.0
+ : factor / ((double) newMemory / oldMemory));
isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
LOGGER.info(
"WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has expanded
from {} to {}.",
@@ -144,7 +148,11 @@ public class WALInsertNodeCache {
}
private void shrinkCallback(long oldMemory, long newMemory, Integer
dataRegionId) {
- memoryUsageCheatFactor.updateAndGet(factor -> factor * ((double) oldMemory
/ newMemory));
+ memoryUsageCheatFactor.updateAndGet(
+ factor ->
+ factor == 0L || newMemory == 0L || oldMemory == 0
+ ? 0.0
+ : factor * ((double) oldMemory / newMemory));
isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
LOGGER.info(
"WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has shrunk
from {} to {}.",
@@ -162,6 +170,26 @@ public class WALInsertNodeCache {
}
}
+ // please call this method at PipeLauncher
+ public static void init() {
+ if (walModelFixedMemory != null) {
+ return;
+ }
+ try {
+ // Allocate memory for the fixed memory block of WAL
+ walModelFixedMemory =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(
+
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(),
+ PipeMemoryBlockType.WAL);
+ } catch (Exception e) {
+ LOGGER.error("Failed to initialize WAL model fixed memory block", e);
+ walModelFixedMemory =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(0,
PipeMemoryBlockType.WAL);
+ }
+ }
+
/////////////////////////// Getter & Setter ///////////////////////////
public InsertNode getInsertNode(final WALEntryPosition position) {