This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch login-restart
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/login-restart by this push:
new bff87a80945 fix
bff87a80945 is described below
commit bff87a8094516d08baeaddd4bd02966931e70109
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 11 11:27:13 2026 +0800
fix
---
.../pipe/source/IoTDBConfigRegionSource.java | 12 +++++++++++-
.../pipe/sink/protocol/writeback/WriteBackSink.java | 21 +++++++++++++--------
.../source/dataregion/IoTDBDataRegionSource.java | 17 ++++++++++++++++-
.../schemaregion/IoTDBSchemaRegionSource.java | 20 +++++++++++++++++++-
4 files changed, 59 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
index 47507c9a33b..68c259b9544 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
@@ -57,6 +57,8 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Collections;
@@ -109,7 +111,15 @@ public class IoTDBConfigRegionSource extends
IoTDBNonDataRegionSource {
}
@Override
- protected void login() {}
+ protected void login(final @Nonnull String password) {
+ if (Objects.isNull(
+ ConfigNode.getInstance()
+ .getConfigManager()
+ .getPermissionManager()
+ .login4Pipe(userName, password))) {
+ throw new PipeException(String.format("Failed to check password for pipe
%s.", pipeName));
+ }
+ }
@Override
protected AbstractPipeListeningQueue getListeningQueue() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 6bca2aa46ba..c119cee2856 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -197,14 +197,19 @@ public class WriteBackSink implements PipeConnector {
}
// Check the password and its expiration
- if (Objects.nonNull(passwordString)) {
- SESSION_MANAGER.login(
- session,
- usernameString,
- passwordString,
- ZoneId.systemDefault().toString(),
- SessionManager.CURRENT_RPC_VERSION,
- IoTDBConstant.ClientVersion.V_1_0);
+ if (Objects.nonNull(passwordString)
+ && SESSION_MANAGER
+ .login(
+ session,
+ usernameString,
+ passwordString,
+ ZoneId.systemDefault().toString(),
+ SessionManager.CURRENT_RPC_VERSION,
+ IoTDBConstant.ClientVersion.V_1_0)
+ .getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format("Failed to check password for pipe %s.",
environment.getPipeName()));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index aeb817b89ec..39d14a4d925 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.source.dataregion;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
@@ -39,6 +40,8 @@ import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegio
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionTsFileSource;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
@@ -55,6 +58,9 @@ import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
@@ -547,7 +553,16 @@ public class IoTDBDataRegionSource extends IoTDBSource {
}
@Override
- protected void login() {}
+ protected void login(final @Nonnull String password) {
+ SessionManager.getInstance()
+ .login(
+ new InternalClientSession("Source_login_session_" + regionId),
+ userName,
+ password,
+ ZoneId.systemDefault().toString(),
+ SessionManager.CURRENT_RPC_VERSION,
+ IoTDBConstant.ClientVersion.V_1_0);
+ }
@Override
public void start() throws Exception {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
index c4795f8bd04..c0e7c392337 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
@@ -42,6 +42,8 @@ import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEve
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionSourceMetrics;
import
org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementToBatchVisitor;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -62,8 +64,11 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.common.constant.TsFileConstant;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.nio.file.Paths;
+import java.time.ZoneId;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -147,7 +152,20 @@ public class IoTDBSchemaRegionSource extends
IoTDBNonDataRegionSource {
}
@Override
- protected void login() {}
+ protected void login(final @Nonnull String password) {
+ if (SessionManager.getInstance()
+ .login(
+ new InternalClientSession("Source_login_session_" + regionId),
+ userName,
+ password,
+ ZoneId.systemDefault().toString(),
+ SessionManager.CURRENT_RPC_VERSION,
+ IoTDBConstant.ClientVersion.V_1_0)
+ .getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(String.format("Failed to check password for pipe
%s.", pipeName));
+ }
+ }
@Override
protected boolean needTransferSnapshot() {