This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e22a54f44b2 Pipe: Fixed the potential memory shortage may lead to
forever blocking (#16570)
e22a54f44b2 is described below
commit e22a54f44b242c1f6591375bb2945adf706e6e4c
Author: Caideyipi <[email protected]>
AuthorDate: Mon Oct 13 15:32:20 2025 +0800
Pipe: Fixed the potential memory shortage may lead to forever blocking
(#16570)
---
.../agent/task/subtask/sink/PipeSinkSubtask.java | 3 +++
.../exception/pipe/PipeNonReportException.java | 27 ++++++++++++++++++++++
.../pipe/receiver/PipeReceiverStatusHandler.java | 4 ++--
3 files changed, 32 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index d3c94173848..308ddfb90d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
+import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
@@ -131,6 +132,8 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
decreaseReferenceCountAndReleaseLastEvent(event, true);
+ } catch (final PipeNonReportException e) {
+ // Ignore, go directly next round
} catch (final PipeException e) {
if (!isClosed.get()) {
setLastExceptionEvent(event);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
new file mode 100644
index 00000000000..572a7764518
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.exception.pipe;
+
+public class PipeNonReportException extends PipeRuntimeNonCriticalException {
+
+ public PipeNonReportException(final String message) {
+ super(message);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 8f00e31c28a..3bb9e14d181 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.commons.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
-import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
@@ -127,7 +127,7 @@ public class PipeReceiverStatusHandler {
LOGGER::info,
"Temporary unavailable exception: will retry forever. status:
%s",
status);
- throw new PipeRuntimeSinkCriticalException(exceptionMessage);
+ throw new PipeNonReportException(exceptionMessage);
}
case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION