This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3a6322028cd fix (#16702)
3a6322028cd is described below
commit 3a6322028cd6026df7ebc8a97020e45fc5d8bf00
Author: Caideyipi <[email protected]>
AuthorDate: Mon Nov 17 11:20:16 2025 +0800
fix (#16702)
---
.../pipe/receiver/PipeReceiverStatusHandler.java | 134 ++++++++++-----------
1 file changed, 64 insertions(+), 70 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 350746d7b8e..9fe87b22fd7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -45,6 +45,9 @@ import java.util.concurrent.atomic.AtomicReference;
public class PipeReceiverStatusHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeReceiverStatusHandler.class);
+ private static final String NO_PERMISSION = "No permission";
+ private static final String UNCLASSIFIED_EXCEPTION = "Unclassified
exception";
+ private static final String NO_PERMISSION_STR = "No permissions for this
operation";
private static final int CONFLICT_RETRY_MAX_TIMES = 100;
@@ -183,83 +186,74 @@ public class PipeReceiverStatusHandler {
if (skipIfNoPrivileges) {
return;
}
-
- synchronized (this) {
- recordExceptionStatusIfNecessary(recordMessage);
-
- if (exceptionEventHasBeenRetried.get()
- && System.currentTimeMillis() -
exceptionFirstEncounteredTime.get()
- > retryMaxMillisWhenOtherExceptionsOccur) {
- LOGGER.warn(
- "No permission: retry timeout. will be ignored. event: {}.
status: {}",
- shouldRecordIgnoredDataWhenOtherExceptionsOccur ?
recordMessage : "not recorded",
- status);
- resetExceptionStatus();
+ handleOtherExceptions(status, exceptionMessage, recordMessage, true);
+ break;
+ case 305:
+ handleOtherExceptions(status, exceptionMessage, recordMessage, false);
+ break;
+ default:
+ // Some auth error may be wrapped in other codes
+ if (exceptionMessage.contains(NO_PERMISSION_STR)) {
+ if (skipIfNoPrivileges) {
return;
}
-
- // Reduce the log if retry forever
- if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
- PipeLogger.log(LOGGER::warn, "No permission: will retry forever.
status: %s", status);
- } else {
- LOGGER.warn(
- "No permission: will retry for at least {} seconds. status:
{}",
- (retryMaxMillisWhenOtherExceptionsOccur
- + exceptionFirstEncounteredTime.get()
- - System.currentTimeMillis())
- / 1000.0,
- status);
- }
-
- exceptionEventHasBeenRetried.set(true);
- throw new PipeRuntimeSinkRetryTimesConfigurableException(
- exceptionMessage,
- (int)
- Math.max(
- PipeSubtask.MAX_RETRY_TIMES,
- Math.min(
- CONFLICT_RETRY_MAX_TIMES,
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
+ handleOtherExceptions(status, exceptionMessage, recordMessage, true);
+ break;
}
+ // Other exceptions
+ handleOtherExceptions(status, exceptionMessage, recordMessage, false);
+ break;
+ }
+ }
- default: // Other exceptions
- synchronized (this) {
- recordExceptionStatusIfNecessary(recordMessage);
+ private synchronized void handleOtherExceptions(
+ final TSStatus status,
+ final String exceptionMessage,
+ final String recordMessage,
+ final boolean noPermission) {
+ recordExceptionStatusIfNecessary(recordMessage);
+
+ if (exceptionEventHasBeenRetried.get()
+ && System.currentTimeMillis() - exceptionFirstEncounteredTime.get()
+ > retryMaxMillisWhenOtherExceptionsOccur) {
+ LOGGER.warn(
+ "{}: retry timeout. will be ignored. event: {}. status: {}",
+ getNoPermission(noPermission),
+ shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage :
"not recorded",
+ status);
+ resetExceptionStatus();
+ return;
+ }
- if (exceptionEventHasBeenRetried.get()
- && System.currentTimeMillis() -
exceptionFirstEncounteredTime.get()
- > retryMaxMillisWhenOtherExceptionsOccur) {
- LOGGER.warn(
- "Unclassified exception: retry timeout. will be ignored.
event: {}. status: {}",
- shouldRecordIgnoredDataWhenOtherExceptionsOccur ?
recordMessage : "not recorded",
- status);
- resetExceptionStatus();
- return;
- }
+ // Reduce the log if retry forever
+ if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
+ PipeLogger.log(
+ LOGGER::warn,
+ "%s: will retry forever. status: %s",
+ getNoPermission(noPermission),
+ status);
+ } else {
+ LOGGER.warn(
+ "{}: will retry for at least {} seconds. status: {}",
+ getNoPermission(noPermission),
+ (retryMaxMillisWhenOtherExceptionsOccur
+ + exceptionFirstEncounteredTime.get()
+ - System.currentTimeMillis())
+ / 1000.0,
+ status);
+ }
- // Reduce the log if retry forever
- if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
- PipeLogger.log(
- LOGGER::warn, "Unclassified exception: will retry forever.
status: %s", status);
- } else {
- LOGGER.warn(
- "Unclassified exception: will retry for at least {} seconds.
status: {}",
- (retryMaxMillisWhenOtherExceptionsOccur
- + exceptionFirstEncounteredTime.get()
- - System.currentTimeMillis())
- / 1000.0,
- status);
- }
+ exceptionEventHasBeenRetried.set(true);
+ throw new PipeRuntimeSinkRetryTimesConfigurableException(
+ exceptionMessage,
+ (int)
+ Math.max(
+ PipeSubtask.MAX_RETRY_TIMES,
+ Math.min(CONFLICT_RETRY_MAX_TIMES,
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
+ }
- exceptionEventHasBeenRetried.set(true);
- throw new PipeRuntimeSinkRetryTimesConfigurableException(
- exceptionMessage,
- (int)
- Math.max(
- PipeSubtask.MAX_RETRY_TIMES,
- Math.min(
- CONFLICT_RETRY_MAX_TIMES,
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
- }
- }
+ private static String getNoPermission(final boolean noPermission) {
+ return noPermission ? NO_PERMISSION : UNCLASSIFIED_EXCEPTION;
}
private void recordExceptionStatusIfNecessary(final String message) {