This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch cg2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cg2 by this push:
new 63e7e52abe5 Pipe: Fixed the semantic of reporting interval && Trimmed
the "toString" of InsertMultiTabletsStatement (#17044)
63e7e52abe5 is described below
commit 63e7e52abe5f4c4670647e4bb7306934e17bc866
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jan 20 16:35:00 2026 +0800
Pipe: Fixed the semantic of reporting interval && Trimmed the "toString" of
InsertMultiTabletsStatement (#17044)
* log
* bz
* partial
* ex-t
* fix
* del
* refactor
* suppress
* sonar
---
.../pipe/agent/task/PipeConfigNodeSubtask.java | 27 +----------
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 6 ++-
.../subtask/processor/PipeProcessorSubtask.java | 8 ++--
.../agent/task/subtask/sink/PipeSinkSubtask.java | 40 +++--------------
.../PipeHistoricalDataRegionTsFileSource.java | 3 +-
.../queryengine/execution/QueryStateMachine.java | 2 +-
.../execution/executor/RegionReadExecutor.java | 4 +-
.../execution/schedule/AbstractDriverThread.java | 1 -
.../crud/InsertMultiTabletsStatement.java | 6 ++-
.../apache/iotdb/db/utils/ErrorHandlingUtils.java | 14 ++----
...imeSinkNonReportTimeConfigurableException.java} | 40 +++++++++++++----
...RuntimeSinkRetryTimesConfigurableException.java | 27 ++++++++++-
.../task/subtask/PipeAbstractSinkSubtask.java | 52 ++++++++++++++++++++++
.../pipe/config/constant/SystemConstant.java | 2 +-
.../pipe/receiver/PipeReceiverStatusHandler.java | 36 +++++----------
.../ErrorHandlingCommonUtils.java} | 16 ++++---
16 files changed, 163 insertions(+), 121 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index 66778f25893..6f52cd33ab0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.confignode.manager.pipe.agent.task;
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
@@ -41,7 +40,6 @@ import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -192,30 +190,8 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
}
decreaseReferenceCountAndReleaseLastEvent(event, true);
sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
- } catch (final PipeNonReportException e) {
- sleep4NonReportException();
- } catch (final PipeException e) {
- setLastExceptionEvent(event);
- if (!isClosed.get()) {
- throw e;
- } else {
- LOGGER.info(
- "{} in pipe transfer, ignored because pipe is dropped.",
- e.getClass().getSimpleName(),
- e);
- clearReferenceCountAndReleaseLastEvent(event);
- }
} catch (final Exception e) {
- setLastExceptionEvent(event);
- if (!isClosed.get()) {
- throw new PipeException(
- String.format(
- "Exception in pipe transfer, subtask: %s, last event: %s",
taskID, lastEvent),
- e);
- } else {
- LOGGER.info("Exception in pipe transfer, ignored because pipe is
dropped.", e);
- clearReferenceCountAndReleaseLastEvent(event);
- }
+ handleException(event, e);
}
return true;
@@ -260,6 +236,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
@Override
protected void report(final EnrichedEvent event, final PipeRuntimeException
exception) {
+ lastExceptionTime = Long.MAX_VALUE;
PipeConfigNodeAgent.runtime().report(event, exception);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index cabf17b7f4f..64d88d22683 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -781,7 +781,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
// If the source is not history, we do not need to allocate memory
boolean isExtractorHistory =
sourceParameters.getBooleanOrDefault(
- SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
+ SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
+ SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE)
|| sourceParameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
@@ -865,7 +866,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
// If the source is history enable, we need to transfer tsfile
boolean needTransferTsFile =
sourceParameters.getBooleanOrDefault(
- SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
+ SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
+ SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE)
|| sourceParameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index ac68fe6ca93..ca5a8d0f4db 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeReportableSubtask;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
@@ -36,7 +37,6 @@ import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import org.apache.iotdb.db.storageengine.StorageEngine;
-import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -210,7 +210,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
lastEvent instanceof EnrichedEvent
? ((EnrichedEvent) lastEvent).coreReportMessage()
: lastEvent,
- ErrorHandlingUtils.getRootCause(e).getMessage()),
+ ErrorHandlingCommonUtils.getRootCause(e).getMessage()),
e);
} else {
LOGGER.info(
@@ -247,7 +247,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
LOGGER.info(
"Exception occurred when closing pipe processor subtask {}, root
cause: {}",
taskID,
- ErrorHandlingUtils.getRootCause(e).getMessage(),
+ ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
e);
} finally {
// should be called after pipeProcessor.close()
@@ -291,7 +291,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
@Override
protected String getRootCause(final Throwable throwable) {
- return ErrorHandlingUtils.getRootCause(throwable).getMessage();
+ return ErrorHandlingCommonUtils.getRootCause(throwable).getMessage();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 73681d6f549..4b4794891f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -19,13 +19,13 @@
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -35,14 +35,12 @@ import
org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
-import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,37 +132,8 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
decreaseReferenceCountAndReleaseLastEvent(event, true);
sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
- } catch (final PipeNonReportException e) {
- sleep4NonReportException();
- } catch (final PipeException e) {
- if (!isClosed.get()) {
- setLastExceptionEvent(event);
- throw e;
- } else {
- LOGGER.info(
- "{} in pipe transfer, ignored because the connector subtask is
dropped.{}",
- e.getClass().getSimpleName(),
- e.getMessage() != null ? " Message: " + e.getMessage() : "");
- clearReferenceCountAndReleaseLastEvent(event);
- }
} catch (final Exception e) {
- if (!isClosed.get()) {
- setLastExceptionEvent(event);
- throw new PipeException(
- String.format(
- "Exception in pipe transfer, subtask: %s, last event: %s, root
cause: %s",
- taskID,
- event instanceof EnrichedEvent
- ? ((EnrichedEvent) event).coreReportMessage()
- : event,
- ErrorHandlingUtils.getRootCause(e).getMessage()),
- e);
- } else {
- LOGGER.info(
- "Exception in pipe transfer, ignored because the sink subtask is
dropped.{}",
- e.getMessage() != null ? " Message: " + e.getMessage() : "");
- clearReferenceCountAndReleaseLastEvent(event);
- }
+ handleException(event, e);
}
return true;
@@ -216,7 +185,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
LOGGER.info(
"Exception occurred when closing pipe connector subtask {}, root
cause: {}",
taskID,
- ErrorHandlingUtils.getRootCause(e).getMessage(),
+ ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
e);
} finally {
inputPendingQueue.discardAllEvents();
@@ -377,11 +346,12 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
@Override
protected String getRootCause(final Throwable throwable) {
- return ErrorHandlingUtils.getRootCause(throwable).getMessage();
+ return ErrorHandlingCommonUtils.getRootCause(throwable).getMessage();
}
@Override
protected void report(final EnrichedEvent event, final PipeRuntimeException
exception) {
+ lastExceptionTime = Long.MAX_VALUE;
PipeDataNodeAgent.runtime().report(event, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
index 91f5be62f11..282be05c259 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
@@ -212,7 +212,8 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
// enabling the historical data extraction, which may affect the realtime
data extraction.
isHistoricalSourceEnabled =
parameters.getBooleanOrDefault(
- SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
+ SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
+ SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE)
|| parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
index c7c16f20baf..146359bc215 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
+import static
org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
import static org.apache.iotdb.db.queryengine.execution.QueryState.ABORTED;
import static org.apache.iotdb.db.queryengine.execution.QueryState.CANCELED;
import static org.apache.iotdb.db.queryengine.execution.QueryState.DISPATCHING;
@@ -42,7 +43,6 @@ import static
org.apache.iotdb.db.queryengine.execution.QueryState.PENDING_RETRY
import static org.apache.iotdb.db.queryengine.execution.QueryState.PLANNED;
import static org.apache.iotdb.db.queryengine.execution.QueryState.QUEUED;
import static org.apache.iotdb.db.queryengine.execution.QueryState.RUNNING;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;
/**
* State machine for a {@link QueryExecution}. It stores the states for the
{@link QueryExecution}.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index cc00f094cfc..7772c4ceaee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.IConsensus;
@@ -33,7 +34,6 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion;
-import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -116,7 +116,7 @@ public class RegionReadExecutor {
RegionExecutionResult resp =
RegionExecutionResult.create(
false, String.format(ERROR_MSG_FORMAT, e.getMessage()), null);
- Throwable t = ErrorHandlingUtils.getRootCause(e);
+ Throwable t = ErrorHandlingCommonUtils.getRootCause(e);
if (t instanceof ReadException
|| t instanceof ReadIndexException
|| t instanceof NotLeaderException
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
index 381c9b9378a..5c369c8e137 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.queryengine.execution.schedule;
import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
import
org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask;
-import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.db.utils.SetThreadName;
import org.slf4j.Logger;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
index 8225d2f6020..b300da7b26e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.statement.crud;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
@@ -163,9 +164,12 @@ public class InsertMultiTabletsStatement extends
InsertBaseStatement {
@Override
public String toString() {
+ final int size =
CommonDescriptor.getInstance().getConfig().getPathLogMaxSize();
return "InsertMultiTabletsStatement{"
+ "insertTabletStatementList="
- + insertTabletStatementList
+ + (Objects.nonNull(insertTabletStatementList) &&
insertTabletStatementList.size() > size
+ ? "(Partial) " + insertTabletStatementList.subList(0, size)
+ : insertTabletStatementList)
+ '}';
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 987eaaf1019..08e8325c787 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.utils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
@@ -69,7 +70,7 @@ public class ErrorHandlingUtils {
LOGGER.warn(ERROR_OPERATION_LOG, statusCode, operation, e);
}
if (e instanceof SemanticException) {
- Throwable rootCause = getRootCause(e);
+ Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
if (e.getCause() instanceof IoTDBException) {
return RpcUtils.getStatus(
((IoTDBException) e.getCause()).getErrorCode(),
rootCause.getMessage());
@@ -86,13 +87,6 @@ public class ErrorHandlingUtils {
return onNpeOrUnexpectedException(e, operation.getName(), statusCode);
}
- public static Throwable getRootCause(Throwable e) {
- while (e.getCause() != null) {
- e = e.getCause();
- }
- return e;
- }
-
public static TSStatus onQueryException(Exception e, String operation,
TSStatusCode statusCode) {
TSStatus status = tryCatchQueryException(e);
if (status != null) {
@@ -135,7 +129,7 @@ public class ErrorHandlingUtils {
}
private static TSStatus tryCatchQueryException(Exception e) {
- Throwable rootCause = getRootCause(e);
+ Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
// ignore logging sg not ready exception
if (rootCause instanceof StorageGroupNotReadyException) {
return RpcUtils.getStatus(TSStatusCode.STORAGE_ENGINE_NOT_READY,
rootCause.getMessage());
@@ -210,7 +204,7 @@ public class ErrorHandlingUtils {
LOGGER.warn(message, e);
return
RpcUtils.getStatus(Arrays.asList(batchException.getFailingStatus()));
} else if (e instanceof IoTDBException) {
- Throwable rootCause = getRootCause(e);
+ Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
// ignore logging sg not ready exception
if (!(rootCause instanceof StorageGroupNotReadyException)) {
LOGGER.warn(message, e);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
similarity index 53%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
index 62b54334a93..eafa4f45a7f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
@@ -19,24 +19,48 @@
package org.apache.iotdb.commons.exception.pipe;
-public class PipeRuntimeSinkRetryTimesConfigurableException
+import java.util.Objects;
+
+public class PipeRuntimeSinkNonReportTimeConfigurableException
extends PipeRuntimeSinkCriticalException {
- private final int retryTimes;
+ private final long interval;
- public PipeRuntimeSinkRetryTimesConfigurableException(
- final String message, final int retryTimes) {
+ public PipeRuntimeSinkNonReportTimeConfigurableException(
+ final String message, final long interval) {
super(message);
- this.retryTimes = retryTimes;
+ this.interval = interval;
}
- public int getRetryTimes() {
- return retryTimes;
+ public long getInterval() {
+ return interval;
}
// We do not record the timestamp here for logger reduction detection
@Override
public String toString() {
- return "PipeRuntimeSinkRetryTimesConfigurableException{" + "message='" +
getMessage() + "}";
+ return "PipeRuntimeSinkNonReportTimeConfigurableException{"
+ + "message='"
+ + "', interval='"
+ + interval
+ + "'}";
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final PipeRuntimeSinkNonReportTimeConfigurableException that =
+ (PipeRuntimeSinkNonReportTimeConfigurableException) o;
+ return super.equals(that) && interval == that.interval;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), interval);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
index 62b54334a93..aa64e533528 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.commons.exception.pipe;
+import java.util.Objects;
+
public class PipeRuntimeSinkRetryTimesConfigurableException
extends PipeRuntimeSinkCriticalException {
@@ -37,6 +39,29 @@ public class PipeRuntimeSinkRetryTimesConfigurableException
// We do not record the timestamp here for logger reduction detection
@Override
public String toString() {
- return "PipeRuntimeSinkRetryTimesConfigurableException{" + "message='" +
getMessage() + "}";
+ return "PipeRuntimeSinkRetryTimesConfigurableException{"
+ + "message='"
+ + getMessage()
+ + "', retryTimes='"
+ + retryTimes
+ + "'}";
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final PipeRuntimeSinkRetryTimesConfigurableException that =
+ (PipeRuntimeSinkRetryTimesConfigurableException) o;
+ return super.equals(that) && retryTimes == that.retryTimes;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), retryTimes);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 87bb0c95949..56e142d6efd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -20,12 +20,15 @@
package org.apache.iotdb.commons.pipe.agent.task.subtask;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -54,6 +57,7 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
protected volatile Event lastExceptionEvent;
protected long sleepInterval =
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
+ protected long lastExceptionTime = Long.MAX_VALUE;
protected PipeAbstractSinkSubtask(
final String taskID, final long creationTime, final PipeConnector
outputPipeConnector) {
@@ -261,4 +265,52 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
Thread.currentThread().interrupt();
}
}
+
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
+ protected void handleException(final Event event, final Exception e) {
+ if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) {
+ if (lastExceptionTime == Long.MAX_VALUE) {
+ lastExceptionTime = System.currentTimeMillis();
+ }
+ if (System.currentTimeMillis() - lastExceptionTime
+ < ((PipeRuntimeSinkNonReportTimeConfigurableException)
e).getInterval()) {
+ sleep4NonReportException();
+ return;
+ }
+ handlePipeException(event, (PipeException) e);
+ } else if (e instanceof PipeException) {
+ handlePipeException(event, (PipeException) e);
+ } else {
+ if (!isClosed.get()) {
+ setLastExceptionEvent(event);
+ throw new PipeException(
+ String.format(
+ "Exception in pipe transfer, subtask: %s, last event: %s, root
cause: %s",
+ taskID,
+ event instanceof EnrichedEvent
+ ? ((EnrichedEvent) event).coreReportMessage()
+ : event,
+ ErrorHandlingCommonUtils.getRootCause(e).getMessage()),
+ e);
+ } else {
+ LOGGER.info(
+ "Exception in pipe transfer, ignored because the sink subtask is
dropped.{}",
+ e.getMessage() != null ? " Message: " + e.getMessage() : "");
+ clearReferenceCountAndReleaseLastEvent(event);
+ }
+ }
+ }
+
+ protected void handlePipeException(final Event event, final PipeException e)
{
+ if (!isClosed.get()) {
+ setLastExceptionEvent(event);
+ throw e;
+ } else {
+ LOGGER.info(
+ "{} in pipe transfer, ignored because the connector subtask is
dropped.{}",
+ e.getClass().getSimpleName(),
+ e.getMessage() != null ? " Message: " + e.getMessage() : "");
+ clearReferenceCountAndReleaseLastEvent(event);
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
index 9c1bc1b521c..a4119979e76 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
@@ -23,7 +23,7 @@ public class SystemConstant {
// This can be arbitrarily changed since it's only a memory key and not
stored
public static final String RESTART_OR_NEWLY_ADDED_KEY =
"__system.restart_or_newly_added";
- public static final boolean RESTART_DEFAULT_VALUE = false;
+ public static final boolean RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE = false;
private SystemConstant() {
throw new IllegalStateException("Utility class");
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 21597702ae5..d48a9fdbcb2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -20,14 +20,11 @@
package org.apache.iotdb.commons.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
-import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
-import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -50,8 +47,6 @@ public class PipeReceiverStatusHandler {
private static final String UNCLASSIFIED_EXCEPTION = "Unclassified
exception";
private static final String NO_PERMISSION_STR = "No permissions for this
operation";
- private static final int CONFLICT_RETRY_MAX_TIMES = 100;
-
private final boolean isRetryAllowedWhenConflictOccurs;
private final long retryMaxMillisWhenConflictOccurs;
private final boolean shouldRecordIgnoredDataWhenConflictOccurs;
@@ -97,7 +92,7 @@ public class PipeReceiverStatusHandler {
* exception if retry the {@link Event}. Upper class must ensure that the
method is invoked only
* by a single thread.
*
- * @throws PipeException to retry the current {@link Event}
+ * @throws PipeRuntimeSinkNonReportTimeConfigurableException to retry the
current {@link Event}
* @param status the {@link TSStatus} to judge
* @param exceptionMessage The exception message to throw
* @param recordMessage The message to record an ignored {@link Event}, the
caller should assure
@@ -128,7 +123,8 @@ public class PipeReceiverStatusHandler {
LOGGER::info,
"Temporary unavailable exception: will retry forever. status:
%s",
status);
- throw new PipeNonReportException(exceptionMessage);
+ throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+ exceptionMessage, Long.MAX_VALUE);
}
case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION
@@ -167,16 +163,12 @@ public class PipeReceiverStatusHandler {
+ " seconds",
status);
exceptionEventHasBeenRetried.set(true);
- throw status.getCode() == 1815
- &&
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
- ? new PipeNonReportException(exceptionMessage)
- : new PipeRuntimeSinkRetryTimesConfigurableException(
- exceptionMessage,
- (int)
- Math.max(
- PipeSubtask.MAX_RETRY_TIMES,
- Math.min(
- CONFLICT_RETRY_MAX_TIMES,
retryMaxMillisWhenConflictOccurs * 1.1)));
+ throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+ exceptionMessage,
+ status.getCode() == 1815
+ &&
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
+ ? Long.MAX_VALUE
+ : retryMaxMillisWhenConflictOccurs);
}
case 803: // NO_PERMISSION
@@ -252,12 +244,8 @@ public class PipeReceiverStatusHandler {
}
exceptionEventHasBeenRetried.set(true);
- throw new PipeRuntimeSinkRetryTimesConfigurableException(
- exceptionMessage,
- (int)
- Math.max(
- PipeSubtask.MAX_RETRY_TIMES,
- Math.min(CONFLICT_RETRY_MAX_TIMES,
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
+ throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+ exceptionMessage, retryMaxMillisWhenOtherExceptionsOccur);
}
private static String getNoPermission(final boolean noPermission) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ErrorHandlingCommonUtils.java
similarity index 70%
rename from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ErrorHandlingCommonUtils.java
index 572a7764518..01b8b64442b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ErrorHandlingCommonUtils.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -17,11 +17,17 @@
* under the License.
*/
-package org.apache.iotdb.commons.exception.pipe;
+package org.apache.iotdb.commons.utils;
-public class PipeNonReportException extends PipeRuntimeNonCriticalException {
+public class ErrorHandlingCommonUtils {
+ public static Throwable getRootCause(Throwable e) {
+ while (e.getCause() != null) {
+ e = e.getCause();
+ }
+ return e;
+ }
- public PipeNonReportException(final String message) {
- super(message);
+ private ErrorHandlingCommonUtils() {
+ // Utility class
}
}