This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c17894e2754a73c9004145b613da83e613618c00
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Aug 7 16:56:37 2025 +0800

    Pipe: CN adds logic to check if Pipe is out of memory (#16119)
    
    * Pipe: CN adds logic to check if Pipe is out of memory
    
    * update AbstractOperatePipeProcedureV2
    
    * update AbstractOperatePipeProcedureV2
    
    * add it
    
    (cherry picked from commit d0788a87d8789f2ac2f5ff76c538c650e311dafc)
---
 .../it/env/cluster/config/MppCommonConfig.java     |   6 ++
 .../env/cluster/config/MppSharedCommonConfig.java  |   6 ++
 .../it/env/remote/config/RemoteCommonConfig.java   |   5 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   2 +
 .../tablemodel/manual/basic/IoTDBPipeMemoryIT.java | 101 +++++++++++++++++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  |  28 ++++++
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |   9 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  23 +++--
 .../commons/pipe/agent/task/PipeTaskAgent.java     |   1 +
 10 files changed, 173 insertions(+), 9 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index ef3856b8068..acaaf35beb6 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -567,6 +567,12 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setDatanodeMemoryProportion(String 
datanodeMemoryProportion) {
+    setProperty("datanode_memory_proportion", datanodeMemoryProportion);
+    return this;
+  }
+
   // For part of the log directory
   public String getClusterConfigStr() {
     return 
fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 59578423c61..2082c257b26 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -587,4 +587,10 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     cnConfig.setDefaultStorageGroupLevel(defaultStorageGroupLevel);
     return this;
   }
+
+  public CommonConfig setDatanodeMemoryProportion(String 
datanodeMemoryProportion) {
+    dnConfig.setDatanodeMemoryProportion(datanodeMemoryProportion);
+    cnConfig.setDatanodeMemoryProportion(datanodeMemoryProportion);
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 62526dca5b2..83404a0c55a 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -406,4 +406,9 @@ public class RemoteCommonConfig implements CommonConfig {
   public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
     return this;
   }
+
+  @Override
+  public CommonConfig setDatanodeMemoryProportion(String 
datanodeMemoryProportion) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index ba503551e9c..fd5490b0c2e 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -183,4 +183,6 @@ public interface CommonConfig {
   default CommonConfig setDefaultStorageGroupLevel(int 
defaultStorageGroupLevel) {
     return this;
   }
+
+  CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeMemoryIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeMemoryIT.java
new file mode 100644
index 00000000000..757ddcda36a
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeMemoryIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic;
+import 
org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2DualTableManualBasic.class})
+public class IoTDBPipeMemoryIT extends AbstractPipeTableModelDualManualIT {
+
+  @Override
+  @Before
+  public void setUp() {
+    super.setUp();
+  }
+
+  @Override
+  protected void setupConfig() {
+    super.setupConfig();
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setPipeMemoryManagementEnabled(true)
+        .setIsPipeEnableMemoryCheck(true)
+        .setDatanodeMemoryProportion("1000:1000:1000:1000:1:1000");
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setPipeMemoryManagementEnabled(true)
+        .setIsPipeEnableMemoryCheck(true)
+        .setDatanodeMemoryProportion("1000:1000:1000:1000:1:1000");
+  }
+
+  @Test
+  public void testCreatePipeMemoryManage() {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("capture.table", "true");
+      extractorAttributes.put("user", "root");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertNotEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+      Assert.assertNotNull(status.getMessage());
+      Assert.assertTrue(status.getMessage().contains("Not enough memory for 
pipe."));
+
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index d153e425dba..31b27c8bf99 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -286,6 +286,7 @@ public enum TSStatusCode {
   PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED(1811),
   PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812),
   PIPE_PUSH_META_TIMEOUT(1813),
+  PIPE_PUSH_META_NOT_ENOUGH_MEMORY(1814),
 
   // Subscription
   SUBSCRIPTION_VERSION_ERROR(1900),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index bca1b74b29b..bd343412660 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -428,11 +428,33 @@ public abstract class AbstractOperatePipeProcedureV2
   public static String parsePushPipeMetaExceptionForPipe(
       final String pipeName, final Map<Integer, TPushPipeMetaResp> respMap) {
     final StringBuilder exceptionMessageBuilder = new StringBuilder();
+    final StringBuilder enoughMemoryMessageBuilder = new StringBuilder();
 
     for (final Map.Entry<Integer, TPushPipeMetaResp> respEntry : 
respMap.entrySet()) {
       final int dataNodeId = respEntry.getKey();
       final TPushPipeMetaResp resp = respEntry.getValue();
 
+      if (resp.getStatus().getCode()
+          == TSStatusCode.PIPE_PUSH_META_NOT_ENOUGH_MEMORY.getStatusCode()) {
+        exceptionMessageBuilder.append(String.format("DataNodeId: %s,", 
dataNodeId));
+        resp.getExceptionMessages()
+            .forEach(
+                message -> {
+                  // Ignore the timeStamp for simplicity
+                  if (pipeName == null) {
+                    enoughMemoryMessageBuilder.append(
+                        String.format(
+                            "PipeName: %s, Message: %s",
+                            message.getPipeName(), message.getMessage()));
+                  } else if (pipeName.equals(message.getPipeName())) {
+                    enoughMemoryMessageBuilder.append(
+                        String.format("Message: %s", message.getMessage()));
+                  }
+                });
+        enoughMemoryMessageBuilder.append(".");
+        continue;
+      }
+
       if (resp.getStatus().getCode() == 
TSStatusCode.PIPE_PUSH_META_TIMEOUT.getStatusCode()) {
         exceptionMessageBuilder.append(
             String.format(
@@ -476,6 +498,12 @@ public abstract class AbstractOperatePipeProcedureV2
         }
       }
     }
+
+    final String enoughMemoryMessage = enoughMemoryMessageBuilder.toString();
+    if (!enoughMemoryMessage.isEmpty()) {
+      throw new PipeException(enoughMemoryMessage);
+    }
+
     return exceptionMessageBuilder.toString();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index e722a199220..d15391fc226 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -717,7 +717,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     if (freeMemorySizeInBytes < needMemory + reservedMemorySizeInBytes) {
       final String message =
           String.format(
-              "Not enough memory for pipe. Need memory: %d bytes, free memory: 
%d bytes, reserved memory: %d bytes, total memory: %d bytes",
+              "%s Need memory: %d bytes, free memory: %d bytes, reserved 
memory: %d bytes, total memory: %d bytes",
+              MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
               needMemory,
               freeMemorySizeInBytes,
               freeMemorySizeInBytes,
@@ -765,8 +766,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     if (remainingMemory < 
PipeConfig.getInstance().PipeInsertNodeQueueMemory()) {
       final String message =
           String.format(
-              "Not enough memory for pipe. Need Floating memory: %d  bytes, 
free Floating memory: %d bytes",
-              PipeConfig.getInstance().PipeInsertNodeQueueMemory(), 
remainingMemory);
+              "%s Need Floating memory: %d  bytes, free Floating memory: %d 
bytes",
+              MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
+              PipeConfig.getInstance().PipeInsertNodeQueueMemory(),
+              remainingMemory);
       LOGGER.warn(message);
       throw new PipeException(message);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 711ab737951..ccf86590ad9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -61,6 +61,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathDeserializeUtil;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.commons.schema.cache.CacheClearOptions;
@@ -1203,13 +1204,23 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       } else {
         throw new Exception("Invalid TPushSinglePipeMetaReq");
       }
-      return exceptionMessage == null
-          ? new TPushPipeMetaResp()
-              .setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
-          : new TPushPipeMetaResp()
-              .setStatus(new 
TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
+      if (exceptionMessage != null) {
+        if (exceptionMessage.message != null
+            && 
exceptionMessage.message.contains(PipeTaskAgent.MESSAGE_PIPE_NOT_ENOUGH_MEMORY))
 {
+          return new TPushPipeMetaResp()
+              .setStatus(
+                  new 
TSStatus(TSStatusCode.PIPE_PUSH_META_NOT_ENOUGH_MEMORY.getStatusCode()))
               
.setExceptionMessages(Collections.singletonList(exceptionMessage));
-    } catch (final Exception e) {
+        }
+
+        return new TPushPipeMetaResp()
+            .setStatus(new 
TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
+            .setExceptionMessages(Collections.singletonList(exceptionMessage));
+      }
+
+      return new TPushPipeMetaResp()
+          .setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    } catch (Exception e) {
       LOGGER.error("Error occurred when pushing single pipe meta", e);
       return new TPushPipeMetaResp()
           .setStatus(new 
TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 779871def33..10c92eb2695 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -81,6 +81,7 @@ public abstract class PipeTaskAgent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskAgent.class);
 
+  public static final String MESSAGE_PIPE_NOT_ENOUGH_MEMORY = "Not enough 
memory for pipe.";
   protected static final String MESSAGE_UNKNOWN_PIPE_STATUS = "Unknown pipe 
status %s for pipe %s";
   protected static final String MESSAGE_UNEXPECTED_PIPE_STATUS = "Unexpected 
pipe status %s: ";
 

Reply via email to