This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 71976f5ba5eebb3e6cccf055fbedcad3501c9db9 Author: Caideyipi <[email protected]> AuthorDate: Mon Nov 17 11:20:16 2025 +0800 fix (#16702) (cherry picked from commit 3a6322028cd6026df7ebc8a97020e45fc5d8bf00) --- .../pipe/receiver/PipeReceiverStatusHandler.java | 134 ++++++++++----------- 1 file changed, 64 insertions(+), 70 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java index 350746d7b8e..9fe87b22fd7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java @@ -45,6 +45,9 @@ import java.util.concurrent.atomic.AtomicReference; public class PipeReceiverStatusHandler { private static final Logger LOGGER = LoggerFactory.getLogger(PipeReceiverStatusHandler.class); + private static final String NO_PERMISSION = "No permission"; + private static final String UNCLASSIFIED_EXCEPTION = "Unclassified exception"; + private static final String NO_PERMISSION_STR = "No permissions for this operation"; private static final int CONFLICT_RETRY_MAX_TIMES = 100; @@ -183,83 +186,74 @@ public class PipeReceiverStatusHandler { if (skipIfNoPrivileges) { return; } - - synchronized (this) { - recordExceptionStatusIfNecessary(recordMessage); - - if (exceptionEventHasBeenRetried.get() - && System.currentTimeMillis() - exceptionFirstEncounteredTime.get() - > retryMaxMillisWhenOtherExceptionsOccur) { - LOGGER.warn( - "No permission: retry timeout. will be ignored. event: {}. status: {}", - shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded", - status); - resetExceptionStatus(); + handleOtherExceptions(status, exceptionMessage, recordMessage, true); + break; + case 305: + handleOtherExceptions(status, exceptionMessage, recordMessage, false); + break; + default: + // Some auth error may be wrapped in other codes + if (exceptionMessage.contains(NO_PERMISSION_STR)) { + if (skipIfNoPrivileges) { return; } - - // Reduce the log if retry forever - if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) { - PipeLogger.log(LOGGER::warn, "No permission: will retry forever. status: %s", status); - } else { - LOGGER.warn( - "No permission: will retry for at least {} seconds. status: {}", - (retryMaxMillisWhenOtherExceptionsOccur - + exceptionFirstEncounteredTime.get() - - System.currentTimeMillis()) - / 1000.0, - status); - } - - exceptionEventHasBeenRetried.set(true); - throw new PipeRuntimeSinkRetryTimesConfigurableException( - exceptionMessage, - (int) - Math.max( - PipeSubtask.MAX_RETRY_TIMES, - Math.min( - CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1))); + handleOtherExceptions(status, exceptionMessage, recordMessage, true); + break; } + // Other exceptions + handleOtherExceptions(status, exceptionMessage, recordMessage, false); + break; + } + } - default: // Other exceptions - synchronized (this) { - recordExceptionStatusIfNecessary(recordMessage); + private synchronized void handleOtherExceptions( + final TSStatus status, + final String exceptionMessage, + final String recordMessage, + final boolean noPermission) { + recordExceptionStatusIfNecessary(recordMessage); + + if (exceptionEventHasBeenRetried.get() + && System.currentTimeMillis() - exceptionFirstEncounteredTime.get() + > retryMaxMillisWhenOtherExceptionsOccur) { + LOGGER.warn( + "{}: retry timeout. will be ignored. event: {}. status: {}", + getNoPermission(noPermission), + shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded", + status); + resetExceptionStatus(); + return; + } - if (exceptionEventHasBeenRetried.get() - && System.currentTimeMillis() - exceptionFirstEncounteredTime.get() - > retryMaxMillisWhenOtherExceptionsOccur) { - LOGGER.warn( - "Unclassified exception: retry timeout. will be ignored. event: {}. status: {}", - shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded", - status); - resetExceptionStatus(); - return; - } + // Reduce the log if retry forever + if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) { + PipeLogger.log( + LOGGER::warn, + "%s: will retry forever. status: %s", + getNoPermission(noPermission), + status); + } else { + LOGGER.warn( + "{}: will retry for at least {} seconds. status: {}", + getNoPermission(noPermission), + (retryMaxMillisWhenOtherExceptionsOccur + + exceptionFirstEncounteredTime.get() + - System.currentTimeMillis()) + / 1000.0, + status); + } - // Reduce the log if retry forever - if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) { - PipeLogger.log( - LOGGER::warn, "Unclassified exception: will retry forever. status: %s", status); - } else { - LOGGER.warn( - "Unclassified exception: will retry for at least {} seconds. status: {}", - (retryMaxMillisWhenOtherExceptionsOccur - + exceptionFirstEncounteredTime.get() - - System.currentTimeMillis()) - / 1000.0, - status); - } + exceptionEventHasBeenRetried.set(true); + throw new PipeRuntimeSinkRetryTimesConfigurableException( + exceptionMessage, + (int) + Math.max( + PipeSubtask.MAX_RETRY_TIMES, + Math.min(CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1))); + } - exceptionEventHasBeenRetried.set(true); - throw new PipeRuntimeSinkRetryTimesConfigurableException( - exceptionMessage, - (int) - Math.max( - PipeSubtask.MAX_RETRY_TIMES, - Math.min( - CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1))); - } - } + private static String getNoPermission(final boolean noPermission) { + return noPermission ? NO_PERMISSION : UNCLASSIFIED_EXCEPTION; } private void recordExceptionStatusIfNecessary(final String message) {
