This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch logger-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/logger-13 by this push:
new c34d6f9e5fe Pipe: Optimized the logger semantic && the retry logic of
memory error at sink subtask (#17166)
c34d6f9e5fe is described below
commit c34d6f9e5fea4db3999001f72cff02508691c2cb
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 5 17:21:26 2026 +0800
Pipe: Optimized the logger semantic && the retry logic of memory error at
sink subtask (#17166)
* shop
* fix
* sit
* logger
---
.../org/apache/iotdb/confignode/manager/ProcedureManager.java | 4 ++--
.../iotdb/confignode/persistence/pipe/PipePluginInfo.java | 4 ++--
.../iotdb/confignode/persistence/pipe/PipeTaskInfo.java | 4 ++--
.../procedure/impl/pipe/plugin/CreatePipePluginProcedure.java | 2 +-
.../procedure/impl/pipe/plugin/DropPipePluginProcedure.java | 2 +-
.../execution/config/executor/ClusterConfigTaskExecutor.java | 10 ++++++----
.../java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java | 1 +
.../pipe/agent/task/subtask/PipeAbstractSinkSubtask.java | 11 ++++++++++-
8 files changed, 25 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 5f6450769f5..f0f1b9e6bfb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1367,9 +1367,9 @@ public class ProcedureManager {
}
}
- public TSStatus alterPipe(TAlterPipeReq req) {
+ public TSStatus alterPipe(final TAlterPipeReq req) {
try {
- AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
+ final AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
executor.submitProcedure(procedure);
TSStatus status = waitingProcedureFinished(procedure);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 36206c05fcd..eae8eea1cbb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -155,8 +155,8 @@ public class PipePluginInfo implements SnapshotProcessor {
final String exceptionMessage =
String.format(
"Failed to create or alter pipe, the pipe extractor plugin %s
does not exist",
- extractorPluginName);
- LOGGER.warn(exceptionMessage);
+ extractorPluginName);
+ LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 08f96a25a61..2bfdbf9e49a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -185,7 +185,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
String.format(
"Failed to create pipe %s, %s",
createPipeRequest.getPipeName(), PIPE_ALREADY_EXIST_MSG);
- LOGGER.warn(exceptionMessage);
+ LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}
@@ -209,7 +209,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
final String exceptionMessage =
String.format(
"Failed to alter pipe %s, %s", alterPipeRequest.getPipeName(),
PIPE_NOT_EXIST_MSG);
- LOGGER.warn(exceptionMessage);
+ LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index e3a4719a719..f4fa738428d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -145,7 +145,7 @@ public class CreatePipePluginProcedure extends
AbstractNodeProcedure<CreatePipeP
}
} catch (PipeException e) {
// The pipe plugin has already created, we should end the procedure
- LOGGER.warn(
+ LOGGER.info(
"Pipe plugin {} is already created, end the
CreatePipePluginProcedure({})",
pluginName,
pluginName);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 665a3782a91..efbe1ee6ccd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -143,7 +143,7 @@ public class DropPipePluginProcedure extends
AbstractNodeProcedure<DropPipePlugi
subscriptionInfo.validatePipePluginUsageByTopic(pluginName);
} catch (PipeException e) {
// if the pipe plugin is a built-in plugin, we should not drop it
- LOGGER.warn(e.getMessage());
+ LOGGER.info(e.getMessage());
pipePluginCoordinator.unlock();
pipeTaskCoordinator.unlock();
setFailure(new ProcedureException(e.getMessage()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 784e890b1e5..7b5b23e644a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -957,10 +957,12 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
.setPluginName(dropPipePluginStatement.getPluginName().toUpperCase())
.setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition()));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
executionStatus.getCode()) {
- LOGGER.warn(
- "[{}] Failed to drop pipe plugin {}.",
- executionStatus,
- dropPipePluginStatement.getPluginName());
+ if (TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode() !=
executionStatus.getCode()) {
+ LOGGER.warn(
+ "[{}] Failed to drop pipe plugin {}.",
+ executionStatus,
+ dropPipePluginStatement.getPluginName());
+ }
future.setException(new IoTDBException(executionStatus));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 072cba8b55c..963cecceb10 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -109,6 +109,7 @@ public class ErrorHandlingUtils {
|| status.getCode() ==
TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode()
|| status.getCode() ==
TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode()
|| status.getCode() ==
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
+ || status.getCode() ==
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode()
|| status.getCode() == TSStatusCode.QUERY_TIMEOUT.getStatusCode())
{
LOGGER.info(message);
} else {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 56e142d6efd..577b57e3cf7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.commons.pipe.agent.task.subtask;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
@@ -33,6 +35,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -268,7 +271,13 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
protected void handleException(final Event event, final Exception e) {
- if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) {
+ if (e instanceof PipeRuntimeOutOfMemoryCriticalException
+ || ExceptionUtils.getRootCause(e) instanceof
PipeRuntimeOutOfMemoryCriticalException) {
+ PipeLogger.log(
+ LOGGER::info,
+ e,
+ "Temporarily out of memory in pipe event transferring, will wait for
the memory to release.");
+ } else if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException)
{
if (lastExceptionTime == Long.MAX_VALUE) {
lastExceptionTime = System.currentTimeMillis();
}