This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c17894e2754a73c9004145b613da83e613618c00 Author: Zhenyu Luo <[email protected]> AuthorDate: Thu Aug 7 16:56:37 2025 +0800 Pipe: CN adds logic to check if Pipe is out of memory (#16119) * Pipe: CN adds logic to check if Pipe is out of memory * update AbstractOperatePipeProcedureV2 * update AbstractOperatePipeProcedureV2 * add it (cherry picked from commit d0788a87d8789f2ac2f5ff76c538c650e311dafc) --- .../it/env/cluster/config/MppCommonConfig.java | 6 ++ .../env/cluster/config/MppSharedCommonConfig.java | 6 ++ .../it/env/remote/config/RemoteCommonConfig.java | 5 + .../org/apache/iotdb/itbase/env/CommonConfig.java | 2 + .../tablemodel/manual/basic/IoTDBPipeMemoryIT.java | 101 +++++++++++++++++++++ .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../impl/pipe/AbstractOperatePipeProcedureV2.java | 28 ++++++ .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 9 +- .../impl/DataNodeInternalRPCServiceImpl.java | 23 +++-- .../commons/pipe/agent/task/PipeTaskAgent.java | 1 + 10 files changed, 173 insertions(+), 9 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index ef3856b8068..acaaf35beb6 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -567,6 +567,12 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig { return this; } + @Override + public CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion) { + setProperty("datanode_memory_proportion", datanodeMemoryProportion); + return this; + } + // For part of the log directory public String getClusterConfigStr() { return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS)) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 59578423c61..2082c257b26 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -587,4 +587,10 @@ public class MppSharedCommonConfig implements CommonConfig { cnConfig.setDefaultStorageGroupLevel(defaultStorageGroupLevel); return this; } + + public CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion) { + dnConfig.setDatanodeMemoryProportion(datanodeMemoryProportion); + cnConfig.setDatanodeMemoryProportion(datanodeMemoryProportion); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 62526dca5b2..83404a0c55a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -406,4 +406,9 @@ public class RemoteCommonConfig implements CommonConfig { public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) { return this; } + + @Override + public CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index ba503551e9c..fd5490b0c2e 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -183,4 +183,6 @@ public interface CommonConfig { default CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) { return this; } + + CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion); } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeMemoryIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeMemoryIT.java new file mode 100644 index 00000000000..757ddcda36a --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeMemoryIT.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic; +import org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.HashMap; +import java.util.Map; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2DualTableManualBasic.class}) +public class IoTDBPipeMemoryIT extends AbstractPipeTableModelDualManualIT { + + @Override + @Before + public void setUp() { + super.setUp(); + } + + @Override + protected void setupConfig() { + super.setupConfig(); + senderEnv + .getConfig() + .getCommonConfig() + .setPipeMemoryManagementEnabled(true) + .setIsPipeEnableMemoryCheck(true) + .setDatanodeMemoryProportion("1000:1000:1000:1000:1:1000"); + receiverEnv + .getConfig() + .getCommonConfig() + .setPipeMemoryManagementEnabled(true) + .setIsPipeEnableMemoryCheck(true) + .setDatanodeMemoryProportion("1000:1000:1000:1000:1:1000"); + } + + @Test + public void testCreatePipeMemoryManage() { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map<String, String> extractorAttributes = new HashMap<>(); + final Map<String, String> processorAttributes = new HashMap<>(); + final Map<String, String> connectorAttributes = new HashMap<>(); + + extractorAttributes.put("capture.table", "true"); + extractorAttributes.put("user", "root"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertNotEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertNotNull(status.getMessage()); + Assert.assertTrue(status.getMessage().contains("Not enough memory for pipe.")); + + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } +} diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index d153e425dba..31b27c8bf99 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -286,6 +286,7 @@ public enum TSStatusCode { PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED(1811), PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812), PIPE_PUSH_META_TIMEOUT(1813), + PIPE_PUSH_META_NOT_ENOUGH_MEMORY(1814), // Subscription SUBSCRIPTION_VERSION_ERROR(1900), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java index bca1b74b29b..bd343412660 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java @@ -428,11 +428,33 @@ public abstract class AbstractOperatePipeProcedureV2 public static String parsePushPipeMetaExceptionForPipe( final String pipeName, final Map<Integer, TPushPipeMetaResp> respMap) { final StringBuilder exceptionMessageBuilder = new StringBuilder(); + final StringBuilder enoughMemoryMessageBuilder = new StringBuilder(); for (final Map.Entry<Integer, TPushPipeMetaResp> respEntry : respMap.entrySet()) { final int dataNodeId = respEntry.getKey(); final TPushPipeMetaResp resp = respEntry.getValue(); + if (resp.getStatus().getCode() + == TSStatusCode.PIPE_PUSH_META_NOT_ENOUGH_MEMORY.getStatusCode()) { + exceptionMessageBuilder.append(String.format("DataNodeId: %s,", dataNodeId)); + resp.getExceptionMessages() + .forEach( + message -> { + // Ignore the timeStamp for simplicity + if (pipeName == null) { + enoughMemoryMessageBuilder.append( + String.format( + "PipeName: %s, Message: %s", + message.getPipeName(), message.getMessage())); + } else if (pipeName.equals(message.getPipeName())) { + enoughMemoryMessageBuilder.append( + String.format("Message: %s", message.getMessage())); + } + }); + enoughMemoryMessageBuilder.append("."); + continue; + } + if (resp.getStatus().getCode() == TSStatusCode.PIPE_PUSH_META_TIMEOUT.getStatusCode()) { exceptionMessageBuilder.append( String.format( @@ -476,6 +498,12 @@ public abstract class AbstractOperatePipeProcedureV2 } } } + + final String enoughMemoryMessage = enoughMemoryMessageBuilder.toString(); + if (!enoughMemoryMessage.isEmpty()) { + throw new PipeException(enoughMemoryMessage); + } + return exceptionMessageBuilder.toString(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index e722a199220..d15391fc226 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -717,7 +717,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { if (freeMemorySizeInBytes < needMemory + reservedMemorySizeInBytes) { final String message = String.format( - "Not enough memory for pipe. Need memory: %d bytes, free memory: %d bytes, reserved memory: %d bytes, total memory: %d bytes", + "%s Need memory: %d bytes, free memory: %d bytes, reserved memory: %d bytes, total memory: %d bytes", + MESSAGE_PIPE_NOT_ENOUGH_MEMORY, needMemory, freeMemorySizeInBytes, freeMemorySizeInBytes, @@ -765,8 +766,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { if (remainingMemory < PipeConfig.getInstance().PipeInsertNodeQueueMemory()) { final String message = String.format( - "Not enough memory for pipe. Need Floating memory: %d bytes, free Floating memory: %d bytes", - PipeConfig.getInstance().PipeInsertNodeQueueMemory(), remainingMemory); + "%s Need Floating memory: %d bytes, free Floating memory: %d bytes", + MESSAGE_PIPE_NOT_ENOUGH_MEMORY, + PipeConfig.getInstance().PipeInsertNodeQueueMemory(), + remainingMemory); LOGGER.warn(message); throw new PipeException(message); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 711ab737951..ccf86590ad9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -61,6 +61,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta; +import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.schema.cache.CacheClearOptions; @@ -1203,13 +1204,23 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface } else { throw new Exception("Invalid TPushSinglePipeMetaReq"); } - return exceptionMessage == null - ? new TPushPipeMetaResp() - .setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())) - : new TPushPipeMetaResp() - .setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode())) + if (exceptionMessage != null) { + if (exceptionMessage.message != null + && exceptionMessage.message.contains(PipeTaskAgent.MESSAGE_PIPE_NOT_ENOUGH_MEMORY)) { + return new TPushPipeMetaResp() + .setStatus( + new TSStatus(TSStatusCode.PIPE_PUSH_META_NOT_ENOUGH_MEMORY.getStatusCode())) .setExceptionMessages(Collections.singletonList(exceptionMessage)); - } catch (final Exception e) { + } + + return new TPushPipeMetaResp() + .setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode())) + .setExceptionMessages(Collections.singletonList(exceptionMessage)); + } + + return new TPushPipeMetaResp() + .setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + } catch (Exception e) { LOGGER.error("Error occurred when pushing single pipe meta", e); return new TPushPipeMetaResp() .setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode())); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 779871def33..10c92eb2695 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -81,6 +81,7 @@ public abstract class PipeTaskAgent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskAgent.class); + public static final String MESSAGE_PIPE_NOT_ENOUGH_MEMORY = "Not enough memory for pipe."; protected static final String MESSAGE_UNKNOWN_PIPE_STATUS = "Unknown pipe status %s for pipe %s"; protected static final String MESSAGE_UNEXPECTED_PIPE_STATUS = "Unexpected pipe status %s: ";
