This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 73721013ad6 [To dev/1.3] Pipe: CN adds logic to check if Pipe is out
of memory #16119 (#16120)
73721013ad6 is described below
commit 73721013ad64bc3e32d05c8982a66563d4721281
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Aug 8 10:11:50 2025 +0800
[To dev/1.3] Pipe: CN adds logic to check if Pipe is out of memory #16119
(#16120)
* Pipe: CN adds logic to check if Pipe is out of memory
* update AbstractOperatePipeProcedureV2
* update AbstractOperatePipeProcedureV2
* add it
---
.../it/env/cluster/config/MppCommonConfig.java | 6 ++
.../env/cluster/config/MppSharedCommonConfig.java | 6 ++
.../it/env/remote/config/RemoteCommonConfig.java | 5 ++
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
.../pipe/it/autocreate/IoTDBPipeMemoryIT.java | 98 ++++++++++++++++++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 28 +++++++
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 9 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 21 +++--
.../commons/pipe/agent/task/PipeTaskAgent.java | 1 +
10 files changed, 169 insertions(+), 8 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 2c1059272da..553890dafe7 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -543,6 +543,12 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setDatanodeMemoryProportion(String
datanodeMemoryProportion) {
+ setProperty("datanode_memory_proportion", datanodeMemoryProportion);
+ return this;
+ }
+
// For part of the log directory
public String getClusterConfigStr() {
return
fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index d131bf862c0..03516a2e07c 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -559,4 +559,10 @@ public class MppSharedCommonConfig implements CommonConfig
{
cnConfig.setDefaultStorageGroupLevel(defaultStorageGroupLevel);
return this;
}
+
+ public CommonConfig setDatanodeMemoryProportion(String
datanodeMemoryProportion) {
+ dnConfig.setDatanodeMemoryProportion(datanodeMemoryProportion);
+ cnConfig.setDatanodeMemoryProportion(datanodeMemoryProportion);
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 32061709b27..a79a56852f1 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -386,4 +386,9 @@ public class RemoteCommonConfig implements CommonConfig {
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
return this;
}
+
+ @Override
+ public CommonConfig setDatanodeMemoryProportion(String
datanodeMemoryProportion) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 24e23a31e5f..7c9a2d00521 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -175,4 +175,6 @@ public interface CommonConfig {
default CommonConfig setDefaultStorageGroupLevel(int
defaultStorageGroupLevel) {
return this;
}
+
+ CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeMemoryIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeMemoryIT.java
new file mode 100644
index 00000000000..bb211a1ffe8
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeMemoryIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class IoTDBPipeMemoryIT extends AbstractPipeDualAutoIT {
+
+ @Override
+ @Before
+ public void setUp() {
+ super.setUp();
+ }
+
+ @Override
+ protected void setupConfig() {
+ super.setupConfig();
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setPipeMemoryManagementEnabled(true)
+ .setIsPipeEnableMemoryCheck(true)
+ .setDatanodeMemoryProportion("1000:1000:1000:1000:1:1000");
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setPipeMemoryManagementEnabled(true)
+ .setIsPipeEnableMemoryCheck(true)
+ .setDatanodeMemoryProportion("1000:1000:1000:1000:1:1000");
+ }
+
+ @Test
+ public void testCreatePipeMemoryManage() {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+ extractorAttributes.put("user", "root");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertNotEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Assert.assertNotNull(status.getMessage());
+ Assert.assertTrue(status.getMessage().contains("Not enough memory for
pipe."));
+
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index e7780a98f92..a4b7ad9bd3a 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -261,6 +261,7 @@ public enum TSStatusCode {
PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED(1811),
PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812),
PIPE_PUSH_META_TIMEOUT(1813),
+ PIPE_PUSH_META_NOT_ENOUGH_MEMORY(1814),
// Subscription
SUBSCRIPTION_VERSION_ERROR(1900),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index eb4ffc48c79..07059d6a0d9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -424,11 +424,33 @@ public abstract class AbstractOperatePipeProcedureV2
public static String parsePushPipeMetaExceptionForPipe(
final String pipeName, final Map<Integer, TPushPipeMetaResp> respMap) {
final StringBuilder exceptionMessageBuilder = new StringBuilder();
+ final StringBuilder enoughMemoryMessageBuilder = new StringBuilder();
for (final Map.Entry<Integer, TPushPipeMetaResp> respEntry :
respMap.entrySet()) {
final int dataNodeId = respEntry.getKey();
final TPushPipeMetaResp resp = respEntry.getValue();
+ if (resp.getStatus().getCode()
+ == TSStatusCode.PIPE_PUSH_META_NOT_ENOUGH_MEMORY.getStatusCode()) {
+ exceptionMessageBuilder.append(String.format("DataNodeId: %s,",
dataNodeId));
+ resp.getExceptionMessages()
+ .forEach(
+ message -> {
+ // Ignore the timeStamp for simplicity
+ if (pipeName == null) {
+ enoughMemoryMessageBuilder.append(
+ String.format(
+ "PipeName: %s, Message: %s",
+ message.getPipeName(), message.getMessage()));
+ } else if (pipeName.equals(message.getPipeName())) {
+ enoughMemoryMessageBuilder.append(
+ String.format("Message: %s", message.getMessage()));
+ }
+ });
+ enoughMemoryMessageBuilder.append(".");
+ continue;
+ }
+
if (resp.getStatus().getCode() ==
TSStatusCode.PIPE_PUSH_META_TIMEOUT.getStatusCode()) {
exceptionMessageBuilder.append(
String.format(
@@ -472,6 +494,12 @@ public abstract class AbstractOperatePipeProcedureV2
}
}
}
+
+ final String enoughMemoryMessage = enoughMemoryMessageBuilder.toString();
+ if (!enoughMemoryMessage.isEmpty()) {
+ throw new PipeException(enoughMemoryMessage);
+ }
+
return exceptionMessageBuilder.toString();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 4266f658df1..0e8ddef54ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -692,7 +692,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
if (freeMemorySizeInBytes < needMemory + reservedMemorySizeInBytes) {
final String message =
String.format(
- "Not enough memory for pipe. Need memory: %d bytes, free memory:
%d bytes, reserved memory: %d bytes, total memory: %d bytes",
+ "%s Need memory: %d bytes, free memory: %d bytes, reserved
memory: %d bytes, total memory: %d bytes",
+ MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
needMemory,
freeMemorySizeInBytes,
freeMemorySizeInBytes,
@@ -739,8 +740,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
if (remainingMemory <
PipeConfig.getInstance().PipeInsertNodeQueueMemory()) {
final String message =
String.format(
- "Not enough memory for pipe. Need Floating memory: %d bytes,
free Floating memory: %d bytes",
- PipeConfig.getInstance().PipeInsertNodeQueueMemory(),
remainingMemory);
+ "%s Need Floating memory: %d bytes, free Floating memory: %d
bytes",
+ MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
+ PipeConfig.getInstance().PipeInsertNodeQueueMemory(),
+ remainingMemory);
LOGGER.warn(message);
throw new PipeException(message);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 5d7fa891530..d4c35b700cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
@@ -1124,12 +1125,22 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
} else {
throw new Exception("Invalid TPushSinglePipeMetaReq");
}
- return exceptionMessage == null
- ? new TPushPipeMetaResp()
- .setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
- : new TPushPipeMetaResp()
- .setStatus(new
TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
+ if (exceptionMessage != null) {
+ if (exceptionMessage.message != null
+ &&
exceptionMessage.message.contains(PipeTaskAgent.MESSAGE_PIPE_NOT_ENOUGH_MEMORY))
{
+ return new TPushPipeMetaResp()
+ .setStatus(
+ new
TSStatus(TSStatusCode.PIPE_PUSH_META_NOT_ENOUGH_MEMORY.getStatusCode()))
.setExceptionMessages(Collections.singletonList(exceptionMessage));
+ }
+
+ return new TPushPipeMetaResp()
+ .setStatus(new
TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
+ .setExceptionMessages(Collections.singletonList(exceptionMessage));
+ }
+
+ return new TPushPipeMetaResp()
+ .setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
} catch (Exception e) {
LOGGER.error("Error occurred when pushing single pipe meta", e);
return new TPushPipeMetaResp()
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index c7758526028..4ada5479d10 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -79,6 +79,7 @@ public abstract class PipeTaskAgent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskAgent.class);
+ public static final String MESSAGE_PIPE_NOT_ENOUGH_MEMORY = "Not enough
memory for pipe.";
protected static final String MESSAGE_UNKNOWN_PIPE_STATUS = "Unknown pipe
status %s for pipe %s";
protected static final String MESSAGE_UNEXPECTED_PIPE_STATUS = "Unexpected
pipe status %s: ";