This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch force_ci/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 71976f5ba5eebb3e6cccf055fbedcad3501c9db9
Author: Caideyipi <[email protected]>
AuthorDate: Mon Nov 17 11:20:16 2025 +0800

    fix (#16702)
    
    (cherry picked from commit 3a6322028cd6026df7ebc8a97020e45fc5d8bf00)
---
 .../pipe/receiver/PipeReceiverStatusHandler.java   | 134 ++++++++++-----------
 1 file changed, 64 insertions(+), 70 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 350746d7b8e..9fe87b22fd7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -45,6 +45,9 @@ import java.util.concurrent.atomic.AtomicReference;
 public class PipeReceiverStatusHandler {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeReceiverStatusHandler.class);
+  private static final String NO_PERMISSION = "No permission";
+  private static final String UNCLASSIFIED_EXCEPTION = "Unclassified 
exception";
+  private static final String NO_PERMISSION_STR = "No permissions for this 
operation";
 
   private static final int CONFLICT_RETRY_MAX_TIMES = 100;
 
@@ -183,83 +186,74 @@ public class PipeReceiverStatusHandler {
         if (skipIfNoPrivileges) {
           return;
         }
-
-        synchronized (this) {
-          recordExceptionStatusIfNecessary(recordMessage);
-
-          if (exceptionEventHasBeenRetried.get()
-              && System.currentTimeMillis() - 
exceptionFirstEncounteredTime.get()
-                  > retryMaxMillisWhenOtherExceptionsOccur) {
-            LOGGER.warn(
-                "No permission: retry timeout. will be ignored. event: {}. 
status: {}",
-                shouldRecordIgnoredDataWhenOtherExceptionsOccur ? 
recordMessage : "not recorded",
-                status);
-            resetExceptionStatus();
+        handleOtherExceptions(status, exceptionMessage, recordMessage, true);
+        break;
+      case 305:
+        handleOtherExceptions(status, exceptionMessage, recordMessage, false);
+        break;
+      default:
+        // Some auth error may be wrapped in other codes
+        if (exceptionMessage.contains(NO_PERMISSION_STR)) {
+          if (skipIfNoPrivileges) {
             return;
           }
-
-          // Reduce the log if retry forever
-          if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
-            PipeLogger.log(LOGGER::warn, "No permission: will retry forever. 
status: %s", status);
-          } else {
-            LOGGER.warn(
-                "No permission: will retry for at least {} seconds. status: 
{}",
-                (retryMaxMillisWhenOtherExceptionsOccur
-                        + exceptionFirstEncounteredTime.get()
-                        - System.currentTimeMillis())
-                    / 1000.0,
-                status);
-          }
-
-          exceptionEventHasBeenRetried.set(true);
-          throw new PipeRuntimeSinkRetryTimesConfigurableException(
-              exceptionMessage,
-              (int)
-                  Math.max(
-                      PipeSubtask.MAX_RETRY_TIMES,
-                      Math.min(
-                          CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
+          handleOtherExceptions(status, exceptionMessage, recordMessage, true);
+          break;
         }
+        // Other exceptions
+        handleOtherExceptions(status, exceptionMessage, recordMessage, false);
+        break;
+    }
+  }
 
-      default: // Other exceptions
-        synchronized (this) {
-          recordExceptionStatusIfNecessary(recordMessage);
+  private synchronized void handleOtherExceptions(
+      final TSStatus status,
+      final String exceptionMessage,
+      final String recordMessage,
+      final boolean noPermission) {
+    recordExceptionStatusIfNecessary(recordMessage);
+
+    if (exceptionEventHasBeenRetried.get()
+        && System.currentTimeMillis() - exceptionFirstEncounteredTime.get()
+            > retryMaxMillisWhenOtherExceptionsOccur) {
+      LOGGER.warn(
+          "{}: retry timeout. will be ignored. event: {}. status: {}",
+          getNoPermission(noPermission),
+          shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : 
"not recorded",
+          status);
+      resetExceptionStatus();
+      return;
+    }
 
-          if (exceptionEventHasBeenRetried.get()
-              && System.currentTimeMillis() - 
exceptionFirstEncounteredTime.get()
-                  > retryMaxMillisWhenOtherExceptionsOccur) {
-            LOGGER.warn(
-                "Unclassified exception: retry timeout. will be ignored. 
event: {}. status: {}",
-                shouldRecordIgnoredDataWhenOtherExceptionsOccur ? 
recordMessage : "not recorded",
-                status);
-            resetExceptionStatus();
-            return;
-          }
+    // Reduce the log if retry forever
+    if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
+      PipeLogger.log(
+          LOGGER::warn,
+          "%s: will retry forever. status: %s",
+          getNoPermission(noPermission),
+          status);
+    } else {
+      LOGGER.warn(
+          "{}: will retry for at least {} seconds. status: {}",
+          getNoPermission(noPermission),
+          (retryMaxMillisWhenOtherExceptionsOccur
+                  + exceptionFirstEncounteredTime.get()
+                  - System.currentTimeMillis())
+              / 1000.0,
+          status);
+    }
 
-          // Reduce the log if retry forever
-          if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
-            PipeLogger.log(
-                LOGGER::warn, "Unclassified exception: will retry forever. 
status: %s", status);
-          } else {
-            LOGGER.warn(
-                "Unclassified exception: will retry for at least {} seconds. 
status: {}",
-                (retryMaxMillisWhenOtherExceptionsOccur
-                        + exceptionFirstEncounteredTime.get()
-                        - System.currentTimeMillis())
-                    / 1000.0,
-                status);
-          }
+    exceptionEventHasBeenRetried.set(true);
+    throw new PipeRuntimeSinkRetryTimesConfigurableException(
+        exceptionMessage,
+        (int)
+            Math.max(
+                PipeSubtask.MAX_RETRY_TIMES,
+                Math.min(CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
+  }
 
-          exceptionEventHasBeenRetried.set(true);
-          throw new PipeRuntimeSinkRetryTimesConfigurableException(
-              exceptionMessage,
-              (int)
-                  Math.max(
-                      PipeSubtask.MAX_RETRY_TIMES,
-                      Math.min(
-                          CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
-        }
-    }
+  private static String getNoPermission(final boolean noPermission) {
+    return noPermission ? NO_PERMISSION : UNCLASSIFIED_EXCEPTION;
   }
 
   private void recordExceptionStatusIfNecessary(final String message) {

Reply via email to