This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e22a54f44b2 Pipe: Fixed the potential memory shortage may lead to 
forever blocking (#16570)
e22a54f44b2 is described below

commit e22a54f44b242c1f6591375bb2945adf706e6e4c
Author: Caideyipi <[email protected]>
AuthorDate: Mon Oct 13 15:32:20 2025 +0800

    Pipe: Fixed the potential memory shortage may lead to forever blocking 
(#16570)
---
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  3 +++
 .../exception/pipe/PipeNonReportException.java     | 27 ++++++++++++++++++++++
 .../pipe/receiver/PipeReceiverStatusHandler.java   |  4 ++--
 3 files changed, 32 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index d3c94173848..308ddfb90d2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
 
+import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
 import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
@@ -131,6 +132,8 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       }
 
       decreaseReferenceCountAndReleaseLastEvent(event, true);
+    } catch (final PipeNonReportException e) {
+      // Ignore, go directly next round
     } catch (final PipeException e) {
       if (!isClosed.get()) {
         setLastExceptionEvent(event);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
new file mode 100644
index 00000000000..572a7764518
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.exception.pipe;
+
+public class PipeNonReportException extends PipeRuntimeNonCriticalException {
+
+  public PipeNonReportException(final String message) {
+    super(message);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 8f00e31c28a..3bb9e14d181 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.commons.pipe.receiver;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
-import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
 import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
@@ -127,7 +127,7 @@ public class PipeReceiverStatusHandler {
               LOGGER::info,
               "Temporary unavailable exception: will retry forever. status: 
%s",
               status);
-          throw new PipeRuntimeSinkCriticalException(exceptionMessage);
+          throw new PipeNonReportException(exceptionMessage);
         }
 
       case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION

Reply via email to