This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch fix-pipe-auth-retry-interval in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 66acc324b0ae8e8e18bcd829cd27908cba4103bf Author: Caideyipi <[email protected]> AuthorDate: Mon Jun 29 10:23:16 2026 +0800 Avoid login lock for pipe auth retry --- .../sink/protocol/legacy/IoTDBLegacyPipeSink.java | 7 +- .../task/subtask/PipeAbstractSinkSubtask.java | 9 ++- .../agent/task/subtask/PipeReportableSubtask.java | 40 +++++++++- .../pipe/sink/client/IoTDBSyncClientManager.java | 6 +- .../commons/pipe/task/PipeSleepIntervalTest.java | 86 ++++++++++++++++++---- 5 files changed, 127 insertions(+), 21 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index c0a9eb6a79d..24544841688 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -278,8 +278,11 @@ public class IoTDBLegacyPipeSink implements PipeConnector { if (openSessionResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { final String errorMsg = String.format( - "Failed to login to receiver %s:%s for legacy pipe transfer because %s", - ipAddress, port, openSessionResp.getStatus().getMessage()); + "Failed to login to receiver %s:%s for legacy pipe transfer because code: %d, message: %s", + ipAddress, + port, + openSessionResp.getStatus().getCode(), + openSessionResp.getStatus().getMessage()); LOGGER.warn(errorMsg); throw new PipeRuntimeCriticalException(errorMsg); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java index 9dbf5af2d0c..610fc12ff4e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java @@ -177,7 +177,7 @@ public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask { MAX_RETRY_TIMES, e); try { - sleepIfNoHighPriorityTask(retry * PipeConfig.getInstance().getPipeSinkRetryIntervalMs()); + sleepIfNoHighPriorityTask(getHandshakeRetrySleepInterval(e, retry)); } catch (final InterruptedException interruptedException) { LOGGER.info( PipeMessages.INTERRUPTED_WHILE_SLEEPING_RETRY_HANDSHAKE, interruptedException); @@ -218,6 +218,13 @@ public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask { return false; } + private long getHandshakeRetrySleepInterval(final Throwable throwable, final int retry) { + final long defaultInterval = retry * PipeConfig.getInstance().getPipeSinkRetryIntervalMs(); + return isAuthenticationFailure(throwable) + ? Math.max(defaultInterval, AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS) + : defaultInterval; + } + /** * Submit a {@link PipeSubtask} to the executor to keep it running. Note that the function will be * called when connector starts or the subTask finishes the last round, Thus the {@link diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java index 52bc3bd61f8..51693d944f6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java @@ -27,15 +27,33 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigur import org.apache.iotdb.commons.i18n.PipeMessages; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; public abstract class PipeReportableSubtask extends PipeSubtask { private static final Logger LOGGER = LoggerFactory.getLogger(PipeReportableSubtask.class); + private static final long DEFAULT_LOGIN_LOCK_WINDOW_MS = TimeUnit.MINUTES.toMillis(10); + private static final int DEFAULT_LOGIN_LOCK_FAILED_ATTEMPTS = 5; + private static final int AUTHENTICATION_FAILURE_IMMEDIATE_ATTEMPTS = 2; + protected static final long AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS = + DEFAULT_LOGIN_LOCK_WINDOW_MS + / (DEFAULT_LOGIN_LOCK_FAILED_ATTEMPTS - AUTHENTICATION_FAILURE_IMMEDIATE_ATTEMPTS) + + TimeUnit.SECONDS.toMillis(1); + private static final Pattern AUTHENTICATION_FAILURE_STATUS_CODE_PATTERN = + Pattern.compile( + String.format( + "(?i)(?:\\b(?:code|status code)\\s*[:=]\\s*(?:%d|%d)\\b|\\b(?:%d|%d):|\\b(?:WRONG_LOGIN_PASSWORD|USER_LOGIN_LOCKED)\\b)", + TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode(), + TSStatusCode.USER_LOGIN_LOCKED.getStatusCode(), + TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode(), + TSStatusCode.USER_LOGIN_LOCKED.getStatusCode())); // To ensure that high-priority tasks can obtain object locks first, a counter is now used to save // the number of high-priority tasks. protected final AtomicLong highPriorityLockTaskCount = new AtomicLong(0); @@ -65,7 +83,7 @@ public abstract class PipeReportableSubtask extends PipeSubtask { // is dropped or the process is running normally. } - private long getSleepIntervalBasedOnThrowable(final Throwable throwable) { + protected long getSleepIntervalBasedOnThrowable(final Throwable throwable) { long sleepInterval = Math.min(1000L * retryCount.get(), 10000); // if receiver is read-only/internal-error/write-reject, connector will retry with // power-increasing interval @@ -76,9 +94,24 @@ public abstract class PipeReportableSubtask extends PipeSubtask { sleepInterval = 1000L * retryCount.get() * retryCount.get(); } } + if (isAuthenticationFailure(throwable)) { + sleepInterval = Math.max(sleepInterval, AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS); + } return sleepInterval; } + protected static boolean isAuthenticationFailure(final Throwable throwable) { + Throwable current = throwable; + while (current != null) { + final String message = current.getMessage(); + if (message != null && AUTHENTICATION_FAILURE_STATUS_CODE_PATTERN.matcher(message).find()) { + return true; + } + current = current.getCause(); + } + return false; + } + private void onReportEventFailure(final Throwable throwable) { final int maxRetryTimes = throwable instanceof PipeRuntimeSinkRetryTimesConfigurableException @@ -199,10 +232,13 @@ public abstract class PipeReportableSubtask extends PipeSubtask { } protected void sleepIfNoHighPriorityTask(long sleepMillis) throws InterruptedException { + if (sleepMillis <= 0) { + return; + } synchronized (highPriorityLockTaskCount) { // The wait operation will release the highPriorityLockTaskCount lock, so there will be // no deadlock. - if (highPriorityLockTaskCount.get() > 0) { + if (highPriorityLockTaskCount.get() == 0) { highPriorityLockTaskCount.wait(sleepMillis); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java index ff62dbf477b..c6ad14aac4d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java @@ -270,7 +270,11 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen client.getIpAddress(), client.getPort(), resp.getStatus()); - endPoint2HandshakeErrorMessage.put(client.getEndPoint(), resp.getStatus().getMessage()); + endPoint2HandshakeErrorMessage.put( + client.getEndPoint(), + String.format( + "code: %d, message: %s", + resp.getStatus().getCode(), resp.getStatus().getMessage())); } else { clientAndStatus.setRight(true); client.setTimeout(CONNECTION_TIMEOUT_MS.get()); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java index 07ae50e992e..a9b72391b2b 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java @@ -25,13 +25,48 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.concurrent.TimeUnit; + public class PipeSleepIntervalTest { + private static class TestSinkSubtask extends PipeAbstractSinkSubtask { + + TestSinkSubtask() { + super(null, 0, null); + } + + @Override + protected String getRootCause(Throwable throwable) { + return null; + } + + @Override + protected void report(EnrichedEvent event, PipeRuntimeException exception) {} + + @Override + protected boolean executeOnce() { + return false; + } + + long getSleepInterval(final Throwable throwable) { + return getSleepIntervalBasedOnThrowable(throwable); + } + + boolean isAuthenticationFailureException(final Throwable throwable) { + return isAuthenticationFailure(throwable); + } + + void sleepWithoutHighPriorityTask(final long sleepMillis) throws InterruptedException { + sleepIfNoHighPriorityTask(sleepMillis); + } + } + private long oldPipeSinkSubtaskSleepIntervalInitMs; private long oldPipeSinkSubtaskSleepIntervalMaxMs; @@ -53,21 +88,7 @@ public class PipeSleepIntervalTest { @Test public void test() { - try (final PipeAbstractSinkSubtask subtask = - new PipeAbstractSinkSubtask(null, 0, null) { - @Override - protected String getRootCause(Throwable throwable) { - return null; - } - - @Override - protected void report(EnrichedEvent event, PipeRuntimeException exception) {} - - @Override - protected boolean executeOnce() { - return false; - } - }) { + try (final TestSinkSubtask subtask = new TestSinkSubtask()) { long startTime = System.currentTimeMillis(); subtask.sleep4NonReportException(); Assert.assertTrue( @@ -80,4 +101,39 @@ public class PipeSleepIntervalTest { >= PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs()); } } + + @Test + public void testAuthenticationFailureRetryInterval() { + try (final TestSinkSubtask subtask = new TestSinkSubtask()) { + Assert.assertTrue( + subtask.isAuthenticationFailureException( + new PipeConnectionException( + "Handshake error with receiver 127.0.0.1:6667, code: 801, message: Authentication failed."))); + Assert.assertTrue( + subtask.isAuthenticationFailureException( + new PipeConnectionException("801: Failed to check password for pipe a2b."))); + Assert.assertTrue( + subtask.isAuthenticationFailureException( + new PipeConnectionException("status code: 822, message: Account is blocked."))); + Assert.assertFalse( + subtask.isAuthenticationFailureException( + new PipeConnectionException("Network error 801 bytes sent."))); + + final long authenticationFailureRetryInterval = + subtask.getSleepInterval(new PipeConnectionException("code: 801")); + Assert.assertTrue(authenticationFailureRetryInterval > TimeUnit.MINUTES.toMillis(3)); + Assert.assertTrue(authenticationFailureRetryInterval * 3 > TimeUnit.MINUTES.toMillis(10)); + Assert.assertTrue( + subtask.getSleepInterval(new PipeConnectionException("network error")) <= 10000); + } + } + + @Test + public void testSleepIfNoHighPriorityTaskWaits() throws Exception { + try (final TestSinkSubtask subtask = new TestSinkSubtask()) { + final long startTime = System.currentTimeMillis(); + subtask.sleepWithoutHighPriorityTask(20L); + Assert.assertTrue(System.currentTimeMillis() - startTime >= 15L); + } + } }
