This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch login-restart
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/login-restart by this push:
     new bff87a80945 fix
bff87a80945 is described below

commit bff87a8094516d08baeaddd4bd02966931e70109
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 11 11:27:13 2026 +0800

    fix
---
 .../pipe/source/IoTDBConfigRegionSource.java        | 12 +++++++++++-
 .../pipe/sink/protocol/writeback/WriteBackSink.java | 21 +++++++++++++--------
 .../source/dataregion/IoTDBDataRegionSource.java    | 17 ++++++++++++++++-
 .../schemaregion/IoTDBSchemaRegionSource.java       | 20 +++++++++++++++++++-
 4 files changed, 59 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
index 47507c9a33b..68c259b9544 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
@@ -57,6 +57,8 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Collections;
@@ -109,7 +111,15 @@ public class IoTDBConfigRegionSource extends 
IoTDBNonDataRegionSource {
   }
 
   @Override
-  protected void login() {}
+  protected void login(final @Nonnull String password) {
+    if (Objects.isNull(
+        ConfigNode.getInstance()
+            .getConfigManager()
+            .getPermissionManager()
+            .login4Pipe(userName, password))) {
+      throw new PipeException(String.format("Failed to check password for pipe 
%s.", pipeName));
+    }
+  }
 
   @Override
   protected AbstractPipeListeningQueue getListeningQueue() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 6bca2aa46ba..c119cee2856 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -197,14 +197,19 @@ public class WriteBackSink implements PipeConnector {
     }
 
     // Check the password and its expiration
-    if (Objects.nonNull(passwordString)) {
-      SESSION_MANAGER.login(
-          session,
-          usernameString,
-          passwordString,
-          ZoneId.systemDefault().toString(),
-          SessionManager.CURRENT_RPC_VERSION,
-          IoTDBConstant.ClientVersion.V_1_0);
+    if (Objects.nonNull(passwordString)
+        && SESSION_MANAGER
+                .login(
+                    session,
+                    usernameString,
+                    passwordString,
+                    ZoneId.systemDefault().toString(),
+                    SessionManager.CURRENT_RPC_VERSION,
+                    IoTDBConstant.ClientVersion.V_1_0)
+                .getCode()
+            != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format("Failed to check password for pipe %s.", 
environment.getPipeName()));
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index aeb817b89ec..39d14a4d925 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.source.dataregion;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
@@ -39,6 +40,8 @@ import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegio
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionTsFileSource;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
@@ -55,6 +58,9 @@ import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
@@ -547,7 +553,16 @@ public class IoTDBDataRegionSource extends IoTDBSource {
   }
 
   @Override
-  protected void login() {}
+  protected void login(final @Nonnull String password) {
+    SessionManager.getInstance()
+        .login(
+            new InternalClientSession("Source_login_session_" + regionId),
+            userName,
+            password,
+            ZoneId.systemDefault().toString(),
+            SessionManager.CURRENT_RPC_VERSION,
+            IoTDBConstant.ClientVersion.V_1_0);
+  }
 
   @Override
   public void start() throws Exception {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
index c4795f8bd04..c0e7c392337 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
@@ -42,6 +42,8 @@ import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEve
 import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionSourceMetrics;
 import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementToBatchVisitor;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -62,8 +64,11 @@ import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.time.ZoneId;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -147,7 +152,20 @@ public class IoTDBSchemaRegionSource extends 
IoTDBNonDataRegionSource {
   }
 
   @Override
-  protected void login() {}
+  protected void login(final @Nonnull String password) {
+    if (SessionManager.getInstance()
+            .login(
+                new InternalClientSession("Source_login_session_" + regionId),
+                userName,
+                password,
+                ZoneId.systemDefault().toString(),
+                SessionManager.CURRENT_RPC_VERSION,
+                IoTDBConstant.ClientVersion.V_1_0)
+            .getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(String.format("Failed to check password for pipe 
%s.", pipeName));
+    }
+  }
 
   @Override
   protected boolean needTransferSnapshot() {

Reply via email to