This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch fix-pipe-auth-retry-interval
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 66acc324b0ae8e8e18bcd829cd27908cba4103bf
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 29 10:23:16 2026 +0800

    Avoid login lock for pipe auth retry
---
 .../sink/protocol/legacy/IoTDBLegacyPipeSink.java  |  7 +-
 .../task/subtask/PipeAbstractSinkSubtask.java      |  9 ++-
 .../agent/task/subtask/PipeReportableSubtask.java  | 40 +++++++++-
 .../pipe/sink/client/IoTDBSyncClientManager.java   |  6 +-
 .../commons/pipe/task/PipeSleepIntervalTest.java   | 86 ++++++++++++++++++----
 5 files changed, 127 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index c0a9eb6a79d..24544841688 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -278,8 +278,11 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
     if (openSessionResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       final String errorMsg =
           String.format(
-              "Failed to login to receiver %s:%s for legacy pipe transfer 
because %s",
-              ipAddress, port, openSessionResp.getStatus().getMessage());
+              "Failed to login to receiver %s:%s for legacy pipe transfer 
because code: %d, message: %s",
+              ipAddress,
+              port,
+              openSessionResp.getStatus().getCode(),
+              openSessionResp.getStatus().getMessage());
       LOGGER.warn(errorMsg);
       throw new PipeRuntimeCriticalException(errorMsg);
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 9dbf5af2d0c..610fc12ff4e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -177,7 +177,7 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
             MAX_RETRY_TIMES,
             e);
         try {
-          sleepIfNoHighPriorityTask(retry * 
PipeConfig.getInstance().getPipeSinkRetryIntervalMs());
+          sleepIfNoHighPriorityTask(getHandshakeRetrySleepInterval(e, retry));
         } catch (final InterruptedException interruptedException) {
           LOGGER.info(
               PipeMessages.INTERRUPTED_WHILE_SLEEPING_RETRY_HANDSHAKE, 
interruptedException);
@@ -218,6 +218,13 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
     return false;
   }
 
+  private long getHandshakeRetrySleepInterval(final Throwable throwable, final 
int retry) {
+    final long defaultInterval = retry * 
PipeConfig.getInstance().getPipeSinkRetryIntervalMs();
+    return isAuthenticationFailure(throwable)
+        ? Math.max(defaultInterval, AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS)
+        : defaultInterval;
+  }
+
   /**
    * Submit a {@link PipeSubtask} to the executor to keep it running. Note 
that the function will be
    * called when connector starts or the subTask finishes the last round, Thus 
the {@link
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index 52bc3bd61f8..51693d944f6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -27,15 +27,33 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigur
 import org.apache.iotdb.commons.i18n.PipeMessages;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
 
 public abstract class PipeReportableSubtask extends PipeSubtask {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeReportableSubtask.class);
+  private static final long DEFAULT_LOGIN_LOCK_WINDOW_MS = 
TimeUnit.MINUTES.toMillis(10);
+  private static final int DEFAULT_LOGIN_LOCK_FAILED_ATTEMPTS = 5;
+  private static final int AUTHENTICATION_FAILURE_IMMEDIATE_ATTEMPTS = 2;
+  protected static final long AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS =
+      DEFAULT_LOGIN_LOCK_WINDOW_MS
+              / (DEFAULT_LOGIN_LOCK_FAILED_ATTEMPTS - 
AUTHENTICATION_FAILURE_IMMEDIATE_ATTEMPTS)
+          + TimeUnit.SECONDS.toMillis(1);
+  private static final Pattern AUTHENTICATION_FAILURE_STATUS_CODE_PATTERN =
+      Pattern.compile(
+          String.format(
+              "(?i)(?:\\b(?:code|status 
code)\\s*[:=]\\s*(?:%d|%d)\\b|\\b(?:%d|%d):|\\b(?:WRONG_LOGIN_PASSWORD|USER_LOGIN_LOCKED)\\b)",
+              TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode(),
+              TSStatusCode.USER_LOGIN_LOCKED.getStatusCode(),
+              TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode(),
+              TSStatusCode.USER_LOGIN_LOCKED.getStatusCode()));
   // To ensure that high-priority tasks can obtain object locks first, a 
counter is now used to save
   // the number of high-priority tasks.
   protected final AtomicLong highPriorityLockTaskCount = new AtomicLong(0);
@@ -65,7 +83,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     // is dropped or the process is running normally.
   }
 
-  private long getSleepIntervalBasedOnThrowable(final Throwable throwable) {
+  protected long getSleepIntervalBasedOnThrowable(final Throwable throwable) {
     long sleepInterval = Math.min(1000L * retryCount.get(), 10000);
     // if receiver is read-only/internal-error/write-reject, connector will 
retry with
     // power-increasing interval
@@ -76,9 +94,24 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
         sleepInterval = 1000L * retryCount.get() * retryCount.get();
       }
     }
+    if (isAuthenticationFailure(throwable)) {
+      sleepInterval = Math.max(sleepInterval, 
AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS);
+    }
     return sleepInterval;
   }
 
+  protected static boolean isAuthenticationFailure(final Throwable throwable) {
+    Throwable current = throwable;
+    while (current != null) {
+      final String message = current.getMessage();
+      if (message != null && 
AUTHENTICATION_FAILURE_STATUS_CODE_PATTERN.matcher(message).find()) {
+        return true;
+      }
+      current = current.getCause();
+    }
+    return false;
+  }
+
   private void onReportEventFailure(final Throwable throwable) {
     final int maxRetryTimes =
         throwable instanceof PipeRuntimeSinkRetryTimesConfigurableException
@@ -199,10 +232,13 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
   }
 
   protected void sleepIfNoHighPriorityTask(long sleepMillis) throws 
InterruptedException {
+    if (sleepMillis <= 0) {
+      return;
+    }
     synchronized (highPriorityLockTaskCount) {
       // The wait operation will release the highPriorityLockTaskCount lock, 
so there will be
       // no deadlock.
-      if (highPriorityLockTaskCount.get() > 0) {
+      if (highPriorityLockTaskCount.get() == 0) {
         highPriorityLockTaskCount.wait(sleepMillis);
       }
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index ff62dbf477b..c6ad14aac4d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -270,7 +270,11 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
             client.getIpAddress(),
             client.getPort(),
             resp.getStatus());
-        endPoint2HandshakeErrorMessage.put(client.getEndPoint(), 
resp.getStatus().getMessage());
+        endPoint2HandshakeErrorMessage.put(
+            client.getEndPoint(),
+            String.format(
+                "code: %d, message: %s",
+                resp.getStatus().getCode(), resp.getStatus().getMessage()));
       } else {
         clientAndStatus.setRight(true);
         client.setTimeout(CONNECTION_TIMEOUT_MS.get());
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java
index 07ae50e992e..a9b72391b2b 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java
@@ -25,13 +25,48 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.concurrent.TimeUnit;
+
 public class PipeSleepIntervalTest {
+  private static class TestSinkSubtask extends PipeAbstractSinkSubtask {
+
+    TestSinkSubtask() {
+      super(null, 0, null);
+    }
+
+    @Override
+    protected String getRootCause(Throwable throwable) {
+      return null;
+    }
+
+    @Override
+    protected void report(EnrichedEvent event, PipeRuntimeException exception) 
{}
+
+    @Override
+    protected boolean executeOnce() {
+      return false;
+    }
+
+    long getSleepInterval(final Throwable throwable) {
+      return getSleepIntervalBasedOnThrowable(throwable);
+    }
+
+    boolean isAuthenticationFailureException(final Throwable throwable) {
+      return isAuthenticationFailure(throwable);
+    }
+
+    void sleepWithoutHighPriorityTask(final long sleepMillis) throws 
InterruptedException {
+      sleepIfNoHighPriorityTask(sleepMillis);
+    }
+  }
+
   private long oldPipeSinkSubtaskSleepIntervalInitMs;
   private long oldPipeSinkSubtaskSleepIntervalMaxMs;
 
@@ -53,21 +88,7 @@ public class PipeSleepIntervalTest {
 
   @Test
   public void test() {
-    try (final PipeAbstractSinkSubtask subtask =
-        new PipeAbstractSinkSubtask(null, 0, null) {
-          @Override
-          protected String getRootCause(Throwable throwable) {
-            return null;
-          }
-
-          @Override
-          protected void report(EnrichedEvent event, PipeRuntimeException 
exception) {}
-
-          @Override
-          protected boolean executeOnce() {
-            return false;
-          }
-        }) {
+    try (final TestSinkSubtask subtask = new TestSinkSubtask()) {
       long startTime = System.currentTimeMillis();
       subtask.sleep4NonReportException();
       Assert.assertTrue(
@@ -80,4 +101,39 @@ public class PipeSleepIntervalTest {
               >= 
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs());
     }
   }
+
+  @Test
+  public void testAuthenticationFailureRetryInterval() {
+    try (final TestSinkSubtask subtask = new TestSinkSubtask()) {
+      Assert.assertTrue(
+          subtask.isAuthenticationFailureException(
+              new PipeConnectionException(
+                  "Handshake error with receiver 127.0.0.1:6667, code: 801, 
message: Authentication failed.")));
+      Assert.assertTrue(
+          subtask.isAuthenticationFailureException(
+              new PipeConnectionException("801: Failed to check password for 
pipe a2b.")));
+      Assert.assertTrue(
+          subtask.isAuthenticationFailureException(
+              new PipeConnectionException("status code: 822, message: Account 
is blocked.")));
+      Assert.assertFalse(
+          subtask.isAuthenticationFailureException(
+              new PipeConnectionException("Network error 801 bytes sent.")));
+
+      final long authenticationFailureRetryInterval =
+          subtask.getSleepInterval(new PipeConnectionException("code: 801"));
+      Assert.assertTrue(authenticationFailureRetryInterval > 
TimeUnit.MINUTES.toMillis(3));
+      Assert.assertTrue(authenticationFailureRetryInterval * 3 > 
TimeUnit.MINUTES.toMillis(10));
+      Assert.assertTrue(
+          subtask.getSleepInterval(new PipeConnectionException("network 
error")) <= 10000);
+    }
+  }
+
+  @Test
+  public void testSleepIfNoHighPriorityTaskWaits() throws Exception {
+    try (final TestSinkSubtask subtask = new TestSinkSubtask()) {
+      final long startTime = System.currentTimeMillis();
+      subtask.sleepWithoutHighPriorityTask(20L);
+      Assert.assertTrue(System.currentTimeMillis() - startTime >= 15L);
+    }
+  }
 }

Reply via email to