This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 85ecbd60826 Pipe: Implemented the login check in customize method & 
Added mosaic to subtask string in logger & Optimized the error code of illegal 
pipe / pipe plugin name string (#17197)
85ecbd60826 is described below

commit 85ecbd608265313eac960520ec46eb03d833b169
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 25 17:46:46 2026 +0800

    Pipe: Implemented the login check in customize method & Added mosaic to 
subtask string in logger & Optimized the error code of illegal pipe / pipe 
plugin name string (#17197)
    
    * wb
    
    * expr
    
    * login
    
    * login
    
    * fix
    
    * fix
    
    * semantic
    
    * mosaic
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * lo
    
    * if
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * may-fix
    
    * fix
    
    * part
    
    * fix
    
    * pwce
    
    * fix
    
    * initial
    
    * partial
    
    * bug-fix
    
    * fix
    
    * ..
    
    * st@ong
    
    * fix
    
    * fb
    
    * 701
    
    * bug-fix
    
    * bug-fix
    
    * further
    
    * fix
    
    * fix
---
 .../treemodel/auto/basic/IoTDBPipeSyntaxIT.java    |   4 +-
 .../treemodel/manual/IoTDBPipePermissionIT.java    | 128 ++++++++++++++++++++-
 .../api/customizer/parameter/PipeParameters.java   |   4 +-
 .../api/exception/PipePasswordCheckException.java  |  26 +++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../iotdb/confignode/manager/ConfigManager.java    |   5 +-
 .../apache/iotdb/confignode/manager/IManager.java  |   3 +-
 .../confignode/manager/PermissionManager.java      |   5 +-
 .../receiver/protocol/IoTDBConfigNodeReceiver.java |   2 +-
 .../pipe/source/IoTDBConfigRegionSource.java       |  17 +++
 .../confignode/persistence/auth/AuthorInfo.java    |   5 +-
 .../persistence/auth/AuthorPlanExecutor.java       |   5 +-
 .../persistence/auth/IAuthorPlanExecutor.java      |   3 +-
 .../impl/pipe/task/AlterPipeProcedureV2.java       |   8 +-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |   8 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   5 +-
 .../confignode/persistence/AuthorInfoTest.java     |   2 +-
 .../pipe/metric/PipeConsensusSyncLagManager.java   |  10 +-
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |  10 +-
 .../iotdb/db/auth/ClusterAuthorityFetcher.java     |  26 +++--
 .../apache/iotdb/db/auth/IAuthorityFetcher.java    |   3 +-
 .../pipe/agent/plugin/PipeDataNodePluginAgent.java |  40 ++++---
 .../dataregion/PipeDataRegionPluginAgent.java      |  22 ++--
 .../overview/PipeDataNodeSinglePipeMetrics.java    |  16 +--
 .../schema/PipeSchemaRegionSourceMetrics.java      |  38 +++---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |   2 +-
 .../sink/protocol/writeback/WriteBackSink.java     |  23 ++++
 .../source/dataregion/IoTDBDataRegionSource.java   | 114 +++++++++++-------
 ...istoricalDataRegionTsFileAndDeletionSource.java |  17 +--
 .../realtime/PipeRealtimeDataRegionSource.java     |  10 +-
 .../schemaregion/IoTDBSchemaRegionSource.java      |  32 +++++-
 .../schemaregion/SchemaRegionListeningFilter.java  |   7 +-
 .../db/protocol/session/InternalClientSession.java |   2 +-
 .../iotdb/db/protocol/session/SessionManager.java  |  26 ++++-
 .../execution/config/TreeConfigTaskVisitor.java    |   7 +-
 .../config/executor/ClusterConfigTaskExecutor.java | 121 +++++++++++++------
 .../execution/config/sys/pipe/AlterPipeTask.java   |   8 +-
 .../execution/config/sys/pipe/CreatePipeTask.java  |  20 ++--
 .../config/sys/pipe/PipeFunctionSupport.java       |   2 +-
 .../apache/iotdb/db/utils/DataNodeAuthUtils.java   |   6 +-
 .../auth/authorizer/LocalFileAuthorizerTest.java   |   8 +-
 .../db/auth/authorizer/OpenIdAuthorizerTest.java   |   8 +-
 .../commons/auth/authorizer/BasicAuthorizer.java   |   7 +-
 .../iotdb/commons/auth/authorizer/IAuthorizer.java |   3 +-
 .../commons/auth/authorizer/OpenIdAuthorizer.java  |   3 +-
 .../commons/pipe/agent/plugin/PipePluginAgent.java |  33 +++---
 .../agent/task/execution/PipeSubtaskExecutor.java  |  18 +--
 .../pipe/source/IoTDBNonDataRegionSource.java      |   3 +
 .../iotdb/commons/pipe/source/IoTDBSource.java     |  19 ++-
 .../src/main/thrift/confignode.thrift              |   1 +
 50 files changed, 643 insertions(+), 253 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
index 6ae8d56bfaa..87bc7b0e95b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
@@ -396,7 +396,7 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeDualTreeModelAutoIT {
                   + "PipePlugin.jar"));
       fail();
     } catch (final SQLException e) {
-      Assert.assertTrue(e.getMessage().contains("1600: Failed to create pipe 
plugin"));
+      Assert.assertTrue(e.getMessage().contains("701: Failed to create pipe 
plugin"));
     }
   }
 
@@ -860,7 +860,7 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeDualTreeModelAutoIT {
         fail();
       } catch (final SQLException e) {
         Assert.assertEquals(
-            "1603: Failed to get executable for PipePlugin TestProcessor, 
please check the URI.",
+            "701: Failed to get executable for PipePlugin TestProcessor, 
please check the URI.",
             e.getMessage());
       }
       try {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index 35403c4dc6d..24807622cf6 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -311,8 +311,7 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualTreeModelManualIT {
               
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
       fail("Shall fail if password is wrong.");
     } catch (final SQLException e) {
-      Assert.assertTrue(
-          e.getMessage().contains("Fail to CREATE_PIPE because Authentication 
failed."));
+      Assert.assertEquals("801: Failed to check password for pipe a2b.", 
e.getMessage());
     }
 
     // Use current session, user is root
@@ -506,4 +505,129 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualTreeModelManualIT {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void testIllegalPassword() throws Exception {
+    TestUtils.executeNonQueries(
+        senderEnv,
+        Arrays.asList(
+            "create user `thulab` 'ST@ongpasswd123456'",
+            "create role `admin`",
+            "grant role `admin` to `thulab`",
+            "grant WRITE, READ, SYSTEM, SECURITY on root.** to role `admin`"),
+        null);
+
+    TestUtils.executeNonQuery(
+        senderEnv,
+        "create aligned timeSeries root.vehicle.plane(temperature DOUBLE, 
pressure INT32)");
+    TestUtils.executeNonQuery(
+        receiverEnv,
+        "create aligned timeSeries root.vehicle.plane(temperature DOUBLE, 
pressure INT32)");
+
+    Connection connection = senderEnv.getConnection();
+    Statement statement = connection.createStatement();
+    try {
+      statement.execute(
+          String.format(
+              "create pipe a2b"
+                  + " with source ("
+                  + "'user'='thulab'"
+                  + ", 'password'='passwd')"
+                  + " with sink ("
+                  + "'node-urls'='%s')",
+              
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+      fail();
+    } catch (final Exception e) {
+      Assert.assertEquals("801: Failed to check password for pipe a2b.", 
e.getMessage());
+    }
+
+    try {
+      statement.execute(
+          "create pipe a2b ('sink'='write-back-sink', 'user'='thulab', 
'password'='passwd')");
+      fail();
+    } catch (final Exception e) {
+      Assert.assertEquals("801: Failed to check password for pipe a2b.", 
e.getMessage());
+    }
+
+    statement.execute(
+        String.format(
+            "create pipe a2b"
+                + " with source ("
+                + "'user'='thulab'"
+                + ", 'password'='ST@ongpasswd123456')"
+                + " with sink ("
+                + "'node-urls'='%s')",
+            receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+
+    TestUtils.executeNonQuery(
+        senderEnv, "insert into root.vehicle.plane(temperature, pressure) 
values (36.5, 1103)");
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select count(pressure) from root.vehicle.plane",
+        "count(root.vehicle.plane.pressure),",
+        Collections.singleton("1,"));
+
+    statement.execute("alter user thulab set password 'newST@ongPassword'");
+
+    try {
+      TestUtils.restartCluster(senderEnv);
+    } catch (final Throwable e) {
+      e.printStackTrace();
+      return;
+    }
+
+    connection = senderEnv.getConnection();
+    statement = connection.createStatement();
+    TestUtils.executeNonQuery(
+        senderEnv, "insert into root.vehicle.plane(temperature, pressure) 
values (36.5, 1103)");
+
+    TestUtils.assertDataAlwaysOnEnv(
+        receiverEnv,
+        "select count(pressure) from root.vehicle.plane",
+        "count(root.vehicle.plane.pressure),",
+        Collections.singleton("1,"));
+
+    try {
+      statement.execute("alter pipe a2b modify source ('password'='fake')");
+    } catch (final SQLException e) {
+      Assert.assertEquals("801: Failed to check password for pipe a2b.", 
e.getMessage());
+    }
+
+    statement.execute("alter pipe a2b modify source 
('password'='newST@ongPassword')");
+
+    // Test empty alter
+    statement.execute("alter pipe a2b");
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select count(pressure) from root.vehicle.plane",
+        "count(root.vehicle.plane.pressure),",
+        Collections.singleton("2,"));
+
+    statement.execute("alter user thulab set password 
'anotherST@ongPassword'");
+
+    try {
+      TestUtils.restartCluster(senderEnv);
+    } catch (final Throwable e) {
+      e.printStackTrace();
+      return;
+    }
+
+    connection = senderEnv.getConnection();
+    statement = connection.createStatement();
+    TestUtils.executeNonQuery(
+        senderEnv, "insert into root.vehicle.plane(temperature, pressure) 
values (36.5, 1103)");
+    statement.execute("alter user thulab set password 'newST@ongPassword'");
+    statement.execute("alter pipe a2b");
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select count(pressure) from root.vehicle.plane",
+        "count(root.vehicle.plane.pressure),",
+        Collections.singleton("3,"));
+
+    statement.close();
+    connection.close();
+  }
 }
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index c4e7edd63d0..c13f87ae579 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -371,7 +371,7 @@ public class PipeParameters {
     return new PipeParameters(thisMap);
   }
 
-  private static class KeyReducer {
+  public static class KeyReducer {
 
     private static final Set<String> FIRST_PREFIXES = new HashSet<>();
     private static final Set<String> SECOND_PREFIXES = new HashSet<>();
@@ -399,7 +399,7 @@ public class PipeParameters {
       return key;
     }
 
-    static String reduce(String key) {
+    public static String reduce(String key) {
       if (key == null) {
         return null;
       }
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipePasswordCheckException.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipePasswordCheckException.java
new file mode 100644
index 00000000000..10dce32c591
--- /dev/null
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipePasswordCheckException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.exception;
+
+public class PipePasswordCheckException extends PipeException {
+  public PipePasswordCheckException(final String message) {
+    super(message);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 019e643fa33..6af20dc2f53 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -265,6 +265,7 @@ public enum TSStatusCode {
   CREATE_PIPE_PLUGIN_ERROR(1600),
   DROP_PIPE_PLUGIN_ERROR(1601),
   PIPE_PLUGIN_LOAD_CLASS_ERROR(1602),
+  @Deprecated
   PIPE_PLUGIN_DOWNLOAD_ERROR(1603),
   CREATE_PIPE_PLUGIN_ON_DATANODE_ERROR(1604),
   DROP_PIPE_PLUGIN_ON_DATANODE_ERROR(1605),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index e51d1a43299..f455edb26b8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1332,10 +1332,11 @@ public class ConfigManager implements IManager {
   }
 
   @Override
-  public TPermissionInfoResp login(String username, String password) {
+  public TPermissionInfoResp login(
+      final String username, final String password, final boolean 
useEncryptedPassword) {
     TSStatus status = confirmLeader();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return permissionManager.login(username, password);
+      return permissionManager.login(username, password, useEncryptedPassword);
     } else {
       TPermissionInfoResp resp = AuthUtils.generateEmptyPermissionInfoResp();
       resp.setStatus(status);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index fe83b8bd0c1..02c82164595 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -501,7 +501,8 @@ public interface IManager {
   DataSet queryPermission(final AuthorPlan authorPlan);
 
   /** login. */
-  TPermissionInfoResp login(String username, String password);
+  TPermissionInfoResp login(
+      final String username, final String password, final boolean 
useEncryptedPassword);
 
   /** Check User Privileges. */
   TPermissionInfoResp checkUserPrivileges(String username, PrivilegeUnion 
union);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index 06be2818593..ee39bfc2b19 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -112,8 +112,9 @@ public class PermissionManager {
     return configManager.getConsensusManager();
   }
 
-  public TPermissionInfoResp login(String username, String password) {
-    return authorInfo.login(username, password);
+  public TPermissionInfoResp login(
+      final String username, final String password, final boolean 
useEncryptedPassword) {
+    return authorInfo.login(username, password, useEncryptedPassword);
   }
 
   public String login4Pipe(final String userName, final String password) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index d9f6e07be2b..81e82b96ade 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -1225,7 +1225,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
 
   @Override
   protected TSStatus login() {
-    return configManager.login(username, password).getStatus();
+    return configManager.login(username, password, false).getStatus();
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
index 60512887703..eca4943c46f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
@@ -55,8 +55,11 @@ import org.apache.iotdb.pipe.api.annotation.TreeModel;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Collections;
@@ -108,6 +111,20 @@ public class IoTDBConfigRegionSource extends 
IoTDBNonDataRegionSource {
     PipeConfigNodeRemainingTimeMetrics.getInstance().register(this);
   }
 
+  @Override
+  protected void login(final @Nonnull String password) {
+    if (ConfigNode.getInstance()
+            .getConfigManager()
+            .getPermissionManager()
+            .login(userName, password, true)
+            .getStatus()
+            .getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipePasswordCheckException(
+          String.format("Failed to check password for pipe %s.", pipeName));
+    }
+  }
+
   @Override
   protected AbstractPipeListeningQueue getListeningQueue() {
     return PipeConfigNodeAgent.runtime().listener();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java
index 743e0a3f09a..d7404009825 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java
@@ -124,8 +124,9 @@ public class AuthorInfo implements SnapshotProcessor {
     this.authorPlanExecutor = authorPlanExecutor;
   }
 
-  public TPermissionInfoResp login(String username, String password) {
-    return authorPlanExecutor.login(username, password);
+  public TPermissionInfoResp login(
+      final String username, final String password, final boolean 
useEncryptedPassword) {
+    return authorPlanExecutor.login(username, password, useEncryptedPassword);
   }
 
   public String login4Pipe(final String username, final String password) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java
index 5e8843418c1..8f216df3250 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java
@@ -74,13 +74,14 @@ public class AuthorPlanExecutor implements 
IAuthorPlanExecutor {
   }
 
   @Override
-  public TPermissionInfoResp login(String username, String password) {
+  public TPermissionInfoResp login(
+      String username, final String password, final boolean 
useEncryptedPassword) {
     boolean status;
     String loginMessage = null;
     TSStatus tsStatus = new TSStatus();
     TPermissionInfoResp result = new TPermissionInfoResp();
     try {
-      status = authorizer.login(username, password);
+      status = authorizer.login(username, password, useEncryptedPassword);
       if (status) {
         // Bring this user's permission information back to the datanode for 
caching
         if (authorizer instanceof OpenIdAuthorizer) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java
index 24f0d8ceb0a..724ebf43f73 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java
@@ -33,7 +33,8 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TAuthizedPatternTreeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 
 public interface IAuthorPlanExecutor {
-  TPermissionInfoResp login(String username, String password);
+  TPermissionInfoResp login(
+      final String username, final String password, final boolean 
useEncryptedPassword);
 
   String login4Pipe(final String username, final String password);
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 53f908bf4fc..921661d13de 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -107,14 +107,18 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
                 PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
                 PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
                 PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
-                PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY);
+                PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY,
+                PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
+                PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY);
     final boolean checkSink =
         new PipeParameters(alterPipeRequest.getConnectorAttributes())
             .hasAnyAttributes(
                 PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY,
                 PipeSinkConstant.SINK_IOTDB_USER_KEY,
                 PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY,
-                PipeSinkConstant.SINK_IOTDB_USERNAME_KEY);
+                PipeSinkConstant.SINK_IOTDB_USERNAME_KEY,
+                PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY,
+                PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY);
 
     pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest);
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index f99293f0a02..f880bbdb85c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -174,7 +174,9 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     if 
(sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY)
         || 
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USER_KEY)
         || 
sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY)
-        || 
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)) {
+        || 
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)
+        || 
sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY)
+        || 
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)) {
       final String hashedPassword =
           env.getConfigManager()
               .getPermissionManager()
@@ -216,7 +218,9 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     if (sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY)
         || sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_USER_KEY)
         || 
sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY)
-        || 
sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY)) {
+        || 
sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY)
+        || 
sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY)
+        || 
sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY)) {
       final String hashedPassword =
           env.getConfigManager()
               .getPermissionManager()
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 26fed0e1c4a..5d6aa8da9f5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -744,7 +744,10 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
 
   @Override
   public TPermissionInfoResp login(TLoginReq req) {
-    return configManager.login(req.getUserrname(), req.getPassword());
+    return configManager.login(
+        req.getUserrname(),
+        req.getPassword(),
+        req.isSetUseEncryptedPassword() && req.isUseEncryptedPassword());
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
index 151307e280d..34a9a411133 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
@@ -664,7 +664,7 @@ public class AuthorInfoTest {
             new ArrayList<>());
     status = authorInfo.authorNonQuery(authorPlan);
     assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
-    TPermissionInfoResp result = authorInfo.login("testuser", 
"password123456");
+    TPermissionInfoResp result = authorInfo.login("testuser", 
"password123456", false);
     assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
result.getStatus().getCode());
   }
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
index bd9a2fd73d3..ac89d99df35 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
@@ -126,27 +126,27 @@ public class PipeConsensusSyncLagManager {
   }
 
   private static class PipeConsensusSyncLagManagerHolder {
-    private static Map<String, PipeConsensusSyncLagManager> 
CONSENSU_GROUP_ID_2_INSTANCE_MAP;
+    private static Map<String, PipeConsensusSyncLagManager> 
CONSENSUS_GROUP_ID_2_INSTANCE_MAP;
 
     private PipeConsensusSyncLagManagerHolder() {
       // empty constructor
     }
 
     private static void build() {
-      if (CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) {
-        CONSENSU_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
+      if (CONSENSUS_GROUP_ID_2_INSTANCE_MAP == null) {
+        CONSENSUS_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
       }
     }
   }
 
   public static PipeConsensusSyncLagManager getInstance(String groupId) {
-    return 
PipeConsensusSyncLagManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
+    return 
PipeConsensusSyncLagManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
         groupId, key -> new PipeConsensusSyncLagManager());
   }
 
   public static void release(String groupId) {
     PipeConsensusSyncLagManager.getInstance(groupId).clear();
-    
PipeConsensusSyncLagManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.remove(groupId);
+    
PipeConsensusSyncLagManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.remove(groupId);
   }
 
   // Only when consensus protocol is PipeConsensus, this method will be called 
once when construct
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index 81db578ace1..21c3bc787ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -142,8 +142,14 @@ public class AuthorityChecker {
     return Optional.ofNullable(user == null ? null : user.getUserId());
   }
 
-  public static TSStatus checkUser(String userName, String password) {
-    TSStatus status = authorityFetcher.get().checkUser(userName, password);
+  public static TSStatus checkUser(final String userName, final String 
password) {
+    return checkUser(userName, password, false);
+  }
+
+  public static TSStatus checkUser(
+      final String userName, final String password, final boolean 
useEncryptedPassword) {
+    final TSStatus status =
+        authorityFetcher.get().checkUser(userName, password, 
useEncryptedPassword);
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return status;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
index 2894ce8bb47..325476174c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
@@ -543,23 +543,31 @@ public class ClusterAuthorityFetcher implements 
IAuthorityFetcher {
   }
 
   @Override
-  public TSStatus checkUser(String username, String password) {
+  public TSStatus checkUser(
+      final String username, final String password, final boolean 
useEncryptedPassword) {
     checkCacheAvailable();
-    User user = iAuthorCache.getUserCache(username);
+    final User user = iAuthorCache.getUserCache(username);
     if (user != null) {
       if (user.isOpenIdUser()) {
         return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-      } else if (password != null && AuthUtils.validatePassword(password, 
user.getPassword())) {
-        return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-      } else if (password != null
-          && AuthUtils.validatePassword(
-              password, user.getPassword(), 
AsymmetricEncrypt.DigestAlgorithm.MD5)) {
-        return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+      } else if (password != null) {
+        if (useEncryptedPassword) {
+          return password.equals(user.getPassword())
+              ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+              : RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD, 
"Authentication failed.");
+        } else {
+          return AuthUtils.validatePassword(password, user.getPassword())
+                  || AuthUtils.validatePassword(
+                      password, user.getPassword(), 
AsymmetricEncrypt.DigestAlgorithm.MD5)
+              ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+              : RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD, 
"Authentication failed.");
+        }
       } else {
         return RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD, 
"Authentication failed.");
       }
     } else {
-      TLoginReq req = new TLoginReq(username, password);
+      TLoginReq req =
+          new TLoginReq(username, 
password).setUseEncryptedPassword(useEncryptedPassword);
       TPermissionInfoResp status = null;
       try (ConfigNodeClient configNodeClient =
           
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java
index 3dc95fa41dc..30267987895 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java
@@ -36,7 +36,8 @@ import java.util.List;
 
 public interface IAuthorityFetcher {
 
-  TSStatus checkUser(String username, String password);
+  TSStatus checkUser(
+      final String username, final String password, final boolean 
useEncryptedPassword);
 
   boolean checkRole(String username, String roleName);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
index 11c1edafccb..d57956109d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
@@ -27,7 +27,9 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableMa
 import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
 import 
org.apache.iotdb.db.pipe.agent.plugin.dataregion.PipeDataRegionPluginAgent;
 import 
org.apache.iotdb.db.pipe.agent.plugin.schemaregion.PipeSchemaRegionPluginAgent;
+import 
org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningFilter;
 import org.apache.iotdb.pipe.api.PipePlugin;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
@@ -70,7 +72,7 @@ public class PipeDataNodePluginAgent {
 
   /////////////////////////////// Pipe Plugin Management 
///////////////////////////////
 
-  public void register(PipePluginMeta pipePluginMeta, ByteBuffer jarFile)
+  public void register(final PipePluginMeta pipePluginMeta, final ByteBuffer 
jarFile)
       throws IOException, PipeException {
     lock.lock();
     try {
@@ -86,7 +88,7 @@ public class PipeDataNodePluginAgent {
     }
   }
 
-  private void checkIfRegistered(PipePluginMeta pipePluginMeta) throws 
PipeException {
+  private void checkIfRegistered(final PipePluginMeta pipePluginMeta) throws 
PipeException {
     final String pluginName = pipePluginMeta.getPluginName();
     final PipePluginMeta information = 
pipePluginMetaKeeper.getPipePluginMeta(pluginName);
     if (information == null) {
@@ -121,7 +123,8 @@ public class PipeDataNodePluginAgent {
     // we allow users to register the same pipe plugin multiple times without 
any error
   }
 
-  private void saveJarFileIfNeeded(String pluginName, String jarName, 
ByteBuffer byteBuffer)
+  private void saveJarFileIfNeeded(
+      final String pluginName, final String jarName, final ByteBuffer 
byteBuffer)
       throws IOException {
     if (byteBuffer != null) {
       PipePluginExecutableManager.getInstance()
@@ -136,7 +139,7 @@ public class PipeDataNodePluginAgent {
    * @param pipePluginMeta the meta information of the PipePlugin
    * @throws PipeException if the PipePlugin can not be loaded or its instance 
can not be created
    */
-  public void doRegister(PipePluginMeta pipePluginMeta) throws PipeException {
+  public void doRegister(final PipePluginMeta pipePluginMeta) throws 
PipeException {
     final String pluginName = pipePluginMeta.getPluginName();
     final String className = pipePluginMeta.getClassName();
 
@@ -156,7 +159,7 @@ public class PipeDataNodePluginAgent {
       pipePluginMetaKeeper.addPipePluginVisibility(
           pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
       classLoaderManager.addPluginAndClassLoader(pluginName, 
pipePluginClassLoader);
-    } catch (IOException
+    } catch (final IOException
         | InstantiationException
         | InvocationTargetException
         | NoSuchMethodException
@@ -173,7 +176,8 @@ public class PipeDataNodePluginAgent {
     }
   }
 
-  public void deregister(String pluginName, boolean needToDeleteJar) throws 
PipeException {
+  public void deregister(final String pluginName, final boolean 
needToDeleteJar)
+      throws PipeException {
     lock.lock();
     try {
       final PipePluginMeta information = 
pipePluginMetaKeeper.getPipePluginMeta(pluginName);
@@ -197,7 +201,7 @@ public class PipeDataNodePluginAgent {
         PipePluginExecutableManager.getInstance()
             .removeFileUnderTemporaryRoot(pluginName.toUpperCase() + ".txt");
       }
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new PipeException(e.getMessage(), e);
     } finally {
       lock.unlock();
@@ -206,20 +210,22 @@ public class PipeDataNodePluginAgent {
 
   // TODO: validate pipe plugin attributes for config node
   public void validate(
-      String pipeName,
-      Map<String, String> extractorAttributes,
-      Map<String, String> processorAttributes,
-      Map<String, String> connectorAttributes)
+      final String pipeName,
+      final Map<String, String> sourceAttributes,
+      final Map<String, String> processorAttributes,
+      final Map<String, String> sinkAttributes)
       throws Exception {
-    dataRegionAgent.validate(
-        pipeName, extractorAttributes, processorAttributes, 
connectorAttributes);
-    schemaRegionAgent.validate(
-        pipeName, extractorAttributes, processorAttributes, 
connectorAttributes);
+    dataRegionAgent.validate(pipeName, sourceAttributes, processorAttributes, 
sinkAttributes);
+
+    if (SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
+        new PipeParameters(sinkAttributes))) {
+      schemaRegionAgent.validate(pipeName, sourceAttributes, 
processorAttributes, sinkAttributes);
+    }
   }
 
   public boolean checkIfPluginSameType(final String oldPluginName, final 
String newPluginName) {
-    PipePluginMeta oldPipePluginMeta = 
pipePluginMetaKeeper.getPipePluginMeta(oldPluginName);
-    PipePluginMeta newPipePluginMeta = 
pipePluginMetaKeeper.getPipePluginMeta(newPluginName);
+    final PipePluginMeta oldPipePluginMeta = 
pipePluginMetaKeeper.getPipePluginMeta(oldPluginName);
+    final PipePluginMeta newPipePluginMeta = 
pipePluginMetaKeeper.getPipePluginMeta(newPluginName);
 
     if (oldPipePluginMeta == null) {
       throw new PipeException(String.format("plugin %s is not registered.", 
oldPluginName));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
index a8d002fb27c..67a7d6549d9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
@@ -62,24 +62,24 @@ public class PipeDataRegionPluginAgent extends 
PipePluginAgent {
 
   @Override
   public void validate(
-      String pipeName,
-      Map<String, String> sourceAttributes,
-      Map<String, String> processorAttributes,
-      Map<String, String> sinkAttributes)
+      final String pipeName,
+      final Map<String, String> sourceAttributes,
+      final Map<String, String> processorAttributes,
+      final Map<String, String> sinkAttributes)
       throws Exception {
-    PipeExtractor temporaryExtractor = validateSource(sourceAttributes);
-    PipeProcessor temporaryProcessor = validateProcessor(processorAttributes);
-    PipeConnector temporaryConnector = validateSink(pipeName, sinkAttributes);
+    final PipeExtractor temporaryExtractor = validateSource(pipeName, 
sourceAttributes);
+    final PipeProcessor temporaryProcessor = 
validateProcessor(processorAttributes);
+    final PipeConnector temporaryConnector = validateSink(pipeName, 
sinkAttributes);
 
     // validate visibility
     // TODO: validate visibility for schema region and config region
-    Visibility pipeVisibility =
+    final Visibility pipeVisibility =
         VisibilityUtils.calculateFromExtractorParameters(new 
PipeParameters(sourceAttributes));
-    Visibility extractorVisibility =
+    final Visibility extractorVisibility =
         
VisibilityUtils.calculateFromPluginClass(temporaryExtractor.getClass());
-    Visibility processorVisibility =
+    final Visibility processorVisibility =
         
VisibilityUtils.calculateFromPluginClass(temporaryProcessor.getClass());
-    Visibility connectorVisibility =
+    final Visibility connectorVisibility =
         
VisibilityUtils.calculateFromPluginClass(temporaryConnector.getClass());
     if (!VisibilityUtils.isCompatible(
         pipeVisibility, extractorVisibility, processorVisibility, 
connectorVisibility)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 8c1ef90f97f..6535d371a91 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -195,29 +195,29 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
 
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
 
-  public void register(final IoTDBDataRegionSource extractor) {
+  public void register(final IoTDBDataRegionSource source) {
     // The metric is global thus the regionId is omitted
-    final String pipeID = extractor.getPipeName() + "_" + 
extractor.getCreationTime();
+    final String pipeID = source.getPipeName() + "_" + 
source.getCreationTime();
     remainingEventAndTimeOperatorMap.computeIfAbsent(
         pipeID,
         k ->
             new PipeDataNodeRemainingEventAndTimeOperator(
-                extractor.getPipeName(), extractor.getCreationTime()));
+                source.getPipeName(), source.getCreationTime()));
     if (Objects.nonNull(metricService)) {
       createMetrics(pipeID);
     }
   }
 
-  public void register(final IoTDBSchemaRegionSource extractor) {
+  public void register(final IoTDBSchemaRegionSource source) {
     // The metric is global thus the regionId is omitted
-    final String pipeID = extractor.getPipeName() + "_" + 
extractor.getCreationTime();
+    final String pipeID = source.getPipeName() + "_" + 
source.getCreationTime();
     remainingEventAndTimeOperatorMap
         .computeIfAbsent(
             pipeID,
             k ->
                 new PipeDataNodeRemainingEventAndTimeOperator(
-                    extractor.getPipeName(), extractor.getCreationTime()))
-        .register(extractor);
+                    source.getPipeName(), source.getCreationTime()))
+        .register(source);
     if (Objects.nonNull(metricService)) {
       createMetrics(pipeID);
     }
@@ -233,7 +233,7 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
 
   public void decreaseInsertNodeEventCount(
       final String pipeName, final long creationTime, final long transferTime) 
{
-    PipeDataNodeRemainingEventAndTimeOperator operator =
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
         remainingEventAndTimeOperatorMap.computeIfAbsent(
             pipeName + "_" + creationTime,
             k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSourceMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSourceMetrics.java
index 9c752d9753f..ce5d60449fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSourceMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSourceMetrics.java
@@ -42,14 +42,14 @@ public class PipeSchemaRegionSourceMetrics implements 
IMetricSet {
   @SuppressWarnings("java:S3077")
   private volatile AbstractMetricService metricService;
 
-  private final Map<String, IoTDBSchemaRegionSource> extractorMap = new 
ConcurrentHashMap<>();
+  private final Map<String, IoTDBSchemaRegionSource> sourceMap = new 
ConcurrentHashMap<>();
 
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
   @Override
   public void bindTo(final AbstractMetricService metricService) {
     this.metricService = metricService;
-    ImmutableSet.copyOf(extractorMap.keySet()).forEach(this::createMetrics);
+    ImmutableSet.copyOf(sourceMap.keySet()).forEach(this::createMetrics);
   }
 
   private void createMetrics(final String taskID) {
@@ -57,24 +57,24 @@ public class PipeSchemaRegionSourceMetrics implements 
IMetricSet {
   }
 
   private void createAutoGauge(final String taskID) {
-    final IoTDBSchemaRegionSource extractor = extractorMap.get(taskID);
+    final IoTDBSchemaRegionSource source = sourceMap.get(taskID);
     metricService.createAutoGauge(
         Metric.UNTRANSFERRED_SCHEMA_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        extractorMap.get(taskID),
+        sourceMap.get(taskID),
         IoTDBSchemaRegionSource::getUnTransferredEventCount,
         Tag.NAME.toString(),
-        extractor.getPipeName(),
+        source.getPipeName(),
         Tag.REGION.toString(),
-        String.valueOf(extractor.getRegionId()),
+        String.valueOf(source.getRegionId()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(extractor.getCreationTime()));
+        String.valueOf(source.getCreationTime()));
   }
 
   @Override
   public void unbindFrom(final AbstractMetricService metricService) {
-    ImmutableSet.copyOf(extractorMap.keySet()).forEach(this::deregister);
-    if (!extractorMap.isEmpty()) {
+    ImmutableSet.copyOf(sourceMap.keySet()).forEach(this::deregister);
+    if (!sourceMap.isEmpty()) {
       LOGGER.warn(
           "Failed to unbind from pipe schema region extractor metrics, 
extractor map not empty");
     }
@@ -85,7 +85,7 @@ public class PipeSchemaRegionSourceMetrics implements 
IMetricSet {
   }
 
   private void removeAutoGauge(final String taskID) {
-    final IoTDBSchemaRegionSource extractor = extractorMap.get(taskID);
+    final IoTDBSchemaRegionSource extractor = sourceMap.get(taskID);
     // pending event count
     metricService.remove(
         MetricType.AUTO_GAUGE,
@@ -100,41 +100,41 @@ public class PipeSchemaRegionSourceMetrics implements 
IMetricSet {
 
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
 
-  public void register(final IoTDBSchemaRegionSource extractor) {
-    final String taskID = extractor.getTaskID();
-    extractorMap.putIfAbsent(taskID, extractor);
+  public void register(final IoTDBSchemaRegionSource source) {
+    final String taskID = source.getTaskID();
+    sourceMap.putIfAbsent(taskID, source);
     if (Objects.nonNull(metricService)) {
       createMetrics(taskID);
     }
   }
 
   public void deregister(final String taskID) {
-    if (!extractorMap.containsKey(taskID)) {
+    if (!sourceMap.containsKey(taskID)) {
       LOGGER.warn(
-          "Failed to deregister pipe schema region extractor metrics, 
IoTDBSchemaRegionExtractor({}) does not exist",
+          "Failed to deregister pipe schema region source metrics, 
IoTDBSchemaRegionSource({}) does not exist",
           taskID);
       return;
     }
     if (Objects.nonNull(metricService)) {
       removeMetrics(taskID);
     }
-    extractorMap.remove(taskID);
+    sourceMap.remove(taskID);
   }
 
   //////////////////////////// singleton ////////////////////////////
 
-  private static class PipeSchemaRegionExtractorMetricsHolder {
+  private static class PipeSchemaRegionSourceMetricsHolder {
 
     private static final PipeSchemaRegionSourceMetrics INSTANCE =
         new PipeSchemaRegionSourceMetrics();
 
-    private PipeSchemaRegionExtractorMetricsHolder() {
+    private PipeSchemaRegionSourceMetricsHolder() {
       // Empty constructor
     }
   }
 
   public static PipeSchemaRegionSourceMetrics getInstance() {
-    return PipeSchemaRegionExtractorMetricsHolder.INSTANCE;
+    return PipeSchemaRegionSourceMetricsHolder.INSTANCE;
   }
 
   private PipeSchemaRegionSourceMetrics() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 904b335ec58..867e2487a50 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -953,7 +953,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     }
 
     long userId = AuthorityChecker.getUserId(username).orElse(-1L);
-    Long timeToExpire = DataNodeAuthUtils.checkPasswordExpiration(userId, 
password);
+    Long timeToExpire = DataNodeAuthUtils.checkPasswordExpiration(userId, 
password, false);
     if (timeToExpire != null && timeToExpire <= System.currentTimeMillis()) {
       return RpcUtils.getStatus(
           TSStatusCode.ILLEGAL_PASSWORD.getStatusCode(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 9d476c54294..07b88a98053 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -83,6 +84,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_CLI_HOSTNAME;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_ID;
@@ -91,6 +93,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_USE_EVENT_USER_NAME_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_USE_EVENT_USER_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_CLI_HOSTNAME;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_ID;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
@@ -156,6 +159,8 @@ public class WriteBackSink implements PipeConnector {
             SINK_IOTDB_USER_KEY,
             CONNECTOR_IOTDB_USERNAME_KEY,
             SINK_IOTDB_USERNAME_KEY);
+    String passwordString =
+        parameters.getStringByKeys(CONNECTOR_IOTDB_PASSWORD_KEY, 
SINK_IOTDB_PASSWORD_KEY);
     String cliHostnameString =
         parameters.getStringByKeys(CONNECTOR_IOTDB_CLI_HOSTNAME, 
SINK_IOTDB_CLI_HOSTNAME);
     userEntity = new UserEntity(Long.parseLong(userIdString), usernameString, 
cliHostnameString);
@@ -191,6 +196,24 @@ public class WriteBackSink implements PipeConnector {
     if (SESSION_MANAGER.getCurrSession() == null) {
       SESSION_MANAGER.registerSession(session);
     }
+
+    // Check the password and its expiration
+    if (Objects.nonNull(passwordString)
+        && SESSION_MANAGER
+                .login(
+                    session,
+                    usernameString,
+                    passwordString,
+                    ZoneId.systemDefault().toString(),
+                    SessionManager.CURRENT_RPC_VERSION,
+                    IoTDBConstant.ClientVersion.V_1_0,
+                    IClientSession.SqlDialect.TREE,
+                    environment.getRegionId() >= 0)
+                .getCode()
+            != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipePasswordCheckException(
+          String.format("Failed to check password for pipe %s.", 
environment.getPipeName()));
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index d1ceb48e8aa..1c510be959b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.source.dataregion;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
@@ -39,6 +40,9 @@ import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegio
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionTsFileSource;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
@@ -50,11 +54,16 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
@@ -128,8 +137,8 @@ public class IoTDBDataRegionSource extends IoTDBSource {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataRegionSource.class);
 
-  private PipeHistoricalDataRegionSource historicalExtractor;
-  private PipeRealtimeDataRegionSource realtimeExtractor;
+  private PipeHistoricalDataRegionSource historicalSource;
+  private PipeRealtimeDataRegionSource realtimeSource;
 
   private DataRegionWatermarkInjector watermarkInjector;
 
@@ -294,11 +303,11 @@ public class IoTDBDataRegionSource extends IoTDBSource {
 
     checkInvalidParameters(validator);
 
-    constructHistoricalExtractor();
-    constructRealtimeExtractor(validator.getParameters());
+    constructHistoricalSource();
+    constructRealtimeSource(validator.getParameters());
 
-    historicalExtractor.validate(validator);
-    realtimeExtractor.validate(validator);
+    historicalSource.validate(validator);
+    realtimeSource.validate(validator);
   }
 
   private void validatePattern(final TreePattern treePattern) {
@@ -430,16 +439,16 @@ public class IoTDBDataRegionSource extends IoTDBSource {
     }
   }
 
-  private void constructHistoricalExtractor() {
-    historicalExtractor = new 
PipeHistoricalDataRegionTsFileAndDeletionSource();
+  private void constructHistoricalSource() {
+    historicalSource = new PipeHistoricalDataRegionTsFileAndDeletionSource();
   }
 
-  private void constructRealtimeExtractor(final PipeParameters parameters) {
+  private void constructRealtimeSource(final PipeParameters parameters) {
     // Use heartbeat only source if disable realtime source
     if (!parameters.getBooleanOrDefault(
         Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
         EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
-      realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource();
+      realtimeSource = new PipeRealtimeDataRegionHeartbeatSource();
       LOGGER.info(
           "Pipe: '{}' ('{}') is set to false, use heartbeat realtime source.",
           EXTRACTOR_REALTIME_ENABLE_KEY,
@@ -449,7 +458,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
 
     // Use heartbeat only source if enable snapshot mode
     if (PipeTaskAgent.isSnapshotMode(parameters)) {
-      realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource();
+      realtimeSource = new PipeRealtimeDataRegionHeartbeatSource();
       LOGGER.info("Pipe: snapshot mode is enabled, use heartbeat realtime 
source.");
       return;
     }
@@ -457,7 +466,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
     // Use hybrid mode by default
     if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, 
SOURCE_MODE_STREAMING_KEY)
         && !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
-      realtimeExtractor = new PipeRealtimeDataRegionHybridSource();
+      realtimeSource = new PipeRealtimeDataRegionHybridSource();
       LOGGER.info(
           "Pipe: '{}' ('{}') and '{}' ('{}') is not set, use hybrid mode by 
default.",
           EXTRACTOR_MODE_STREAMING_KEY,
@@ -473,9 +482,9 @@ public class IoTDBDataRegionSource extends IoTDBSource {
               Arrays.asList(EXTRACTOR_MODE_STREAMING_KEY, 
SOURCE_MODE_STREAMING_KEY),
               EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE);
       if (isStreamingMode) {
-        realtimeExtractor = new PipeRealtimeDataRegionHybridSource();
+        realtimeSource = new PipeRealtimeDataRegionHybridSource();
       } else {
-        realtimeExtractor = new PipeRealtimeDataRegionTsFileSource();
+        realtimeSource = new PipeRealtimeDataRegionTsFileSource();
       }
       return;
     }
@@ -483,18 +492,18 @@ public class IoTDBDataRegionSource extends IoTDBSource {
     switch (parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
       case EXTRACTOR_REALTIME_MODE_FILE_VALUE:
       case EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE:
-        realtimeExtractor = new PipeRealtimeDataRegionTsFileSource();
+        realtimeSource = new PipeRealtimeDataRegionTsFileSource();
         break;
       case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
       case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
       case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE:
-        realtimeExtractor = new PipeRealtimeDataRegionHybridSource();
+        realtimeSource = new PipeRealtimeDataRegionHybridSource();
         break;
       case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
-        realtimeExtractor = new PipeRealtimeDataRegionLogSource();
+        realtimeSource = new PipeRealtimeDataRegionLogSource();
         break;
       default:
-        realtimeExtractor = new PipeRealtimeDataRegionHybridSource();
+        realtimeSource = new PipeRealtimeDataRegionHybridSource();
         if (LOGGER.isWarnEnabled()) {
           LOGGER.warn(
               "Pipe: Unsupported source realtime mode: {}, create a hybrid 
source.",
@@ -513,8 +522,8 @@ public class IoTDBDataRegionSource extends IoTDBSource {
 
     super.customize(parameters, configuration);
 
-    historicalExtractor.customize(parameters, configuration);
-    realtimeExtractor.customize(parameters, configuration);
+    historicalSource.customize(parameters, configuration);
+    realtimeSource.customize(parameters, configuration);
 
     // Set watermark injector
     long watermarkIntervalInMs = EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE;
@@ -542,8 +551,29 @@ public class IoTDBDataRegionSource extends IoTDBSource {
 
     // register metric after generating taskID
     PipeDataRegionSourceMetrics.getInstance().register(this);
-    PipeTsFileToTabletsMetrics.getInstance().register(this);
-    PipeDataNodeSinglePipeMetrics.getInstance().register(this);
+    if (regionId >= 0) {
+      PipeTsFileToTabletsMetrics.getInstance().register(this);
+      PipeDataNodeSinglePipeMetrics.getInstance().register(this);
+    }
+  }
+
+  @Override
+  protected void login(final @Nonnull String password) {
+    if (SessionManager.getInstance()
+            .login(
+                new InternalClientSession("Source_login_session_" + regionId),
+                userName,
+                password,
+                ZoneId.systemDefault().toString(),
+                SessionManager.CURRENT_RPC_VERSION,
+                IoTDBConstant.ClientVersion.V_1_0,
+                IClientSession.SqlDialect.TREE,
+                regionId >= 0)
+            .getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipePasswordCheckException(
+          String.format("Failed to check password for pipe %s.", pipeName));
+    }
   }
 
   @Override
@@ -557,8 +587,8 @@ public class IoTDBDataRegionSource extends IoTDBSource {
         "Pipe {}@{}: Starting historical source {} and realtime source {}.",
         pipeName,
         regionId,
-        historicalExtractor.getClass().getSimpleName(),
-        realtimeExtractor.getClass().getSimpleName());
+        historicalSource.getClass().getSimpleName(),
+        realtimeSource.getClass().getSimpleName());
 
     super.start();
 
@@ -591,8 +621,8 @@ public class IoTDBDataRegionSource extends IoTDBSource {
             "Pipe {}@{}: Started historical source {} and realtime source {} 
successfully within {} ms.",
             pipeName,
             regionId,
-            historicalExtractor.getClass().getSimpleName(),
-            realtimeExtractor.getClass().getSimpleName(),
+            historicalSource.getClass().getSimpleName(),
+            realtimeSource.getClass().getSimpleName(),
             System.currentTimeMillis() - startTime);
         return;
       }
@@ -603,21 +633,21 @@ public class IoTDBDataRegionSource extends IoTDBSource {
   private void startHistoricalExtractorAndRealtimeExtractor(
       final AtomicReference<Exception> exceptionHolder) {
     try {
-      // Start realtimeExtractor first to avoid losing data. This may cause 
some
+      // Start realtimeSource first to avoid losing data. This may cause some
       // retransmission, yet it is OK according to the idempotency of IoTDB.
       // Note: The order of historical collection is flushing data -> adding 
all tsFile events.
       // There can still be writing when tsFile events are added. If we start
-      // realtimeExtractor after the process, then this part of data will be 
lost.
-      realtimeExtractor.start();
-      historicalExtractor.start();
+      // realtimeSource after the process, then this part of data will be lost.
+      realtimeSource.start();
+      historicalSource.start();
     } catch (final Exception e) {
       exceptionHolder.set(e);
       LOGGER.warn(
           "Pipe {}@{}: Start historical source {} and realtime source {} 
error.",
           pipeName,
           regionId,
-          historicalExtractor.getClass().getSimpleName(),
-          realtimeExtractor.getClass().getSimpleName(),
+          historicalSource.getClass().getSimpleName(),
+          realtimeSource.getClass().getSimpleName(),
           e);
     }
   }
@@ -635,14 +665,14 @@ public class IoTDBDataRegionSource extends IoTDBSource {
     }
 
     Event event = null;
-    if (!historicalExtractor.hasConsumedAll()) {
-      event = historicalExtractor.supply();
+    if (!historicalSource.hasConsumedAll()) {
+      event = historicalSource.supply();
     } else {
       if (Objects.nonNull(watermarkInjector)) {
         event = watermarkInjector.inject();
       }
       if (Objects.isNull(event)) {
-        event = realtimeExtractor.supply();
+        event = realtimeSource.supply();
       }
     }
 
@@ -665,8 +695,8 @@ public class IoTDBDataRegionSource extends IoTDBSource {
       return;
     }
 
-    historicalExtractor.close();
-    realtimeExtractor.close();
+    historicalSource.close();
+    realtimeSource.close();
     if (Objects.nonNull(taskID)) {
       PipeDataRegionSourceMetrics.getInstance().deregister(taskID);
     }
@@ -675,20 +705,20 @@ public class IoTDBDataRegionSource extends IoTDBSource {
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
 
   public int getHistoricalTsFileInsertionEventCount() {
-    return hasBeenStarted.get() && Objects.nonNull(historicalExtractor)
-        ? historicalExtractor.getPendingQueueSize()
+    return hasBeenStarted.get() && Objects.nonNull(historicalSource)
+        ? historicalSource.getPendingQueueSize()
         : 0;
   }
 
   public int getTabletInsertionEventCount() {
-    return hasBeenStarted.get() ? 
realtimeExtractor.getTabletInsertionEventCount() : 0;
+    return hasBeenStarted.get() ? 
realtimeSource.getTabletInsertionEventCount() : 0;
   }
 
   public int getRealtimeTsFileInsertionEventCount() {
-    return hasBeenStarted.get() ? 
realtimeExtractor.getTsFileInsertionEventCount() : 0;
+    return hasBeenStarted.get() ? 
realtimeSource.getTsFileInsertionEventCount() : 0;
   }
 
   public int getPipeHeartbeatEventCount() {
-    return hasBeenStarted.get() ? 
realtimeExtractor.getPipeHeartbeatEventCount() : 0;
+    return hasBeenStarted.get() ? realtimeSource.getPipeHeartbeatEventCount() 
: 0;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 8aac47b01c7..321458e156b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -57,6 +57,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -305,17 +306,17 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
       return;
     }
 
-    final PipeTaskSourceRuntimeEnvironment environment =
-        (PipeTaskSourceRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
+    final PipeRuntimeEnvironment environment = 
configuration.getRuntimeEnvironment();
 
     pipeName = environment.getPipeName();
     creationTime = environment.getCreationTime();
-    pipeTaskMeta = environment.getPipeTaskMeta();
-    if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
-      startIndex =
-          
tryToExtractLocalProgressIndexForIoTV2(environment.getPipeTaskMeta().getProgressIndex());
-    } else {
-      startIndex = environment.getPipeTaskMeta().getProgressIndex();
+    if (environment instanceof PipeTaskSourceRuntimeEnvironment) {
+      pipeTaskMeta = ((PipeTaskSourceRuntimeEnvironment) 
environment).getPipeTaskMeta();
+      if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
+        startIndex = 
tryToExtractLocalProgressIndexForIoTV2(pipeTaskMeta.getProgressIndex());
+      } else {
+        startIndex = pipeTaskMeta.getProgressIndex();
+      }
     }
 
     dataRegionId = environment.getRegionId();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index de0ee264473..204c9c87d14 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -47,6 +47,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.pipe.api.PipeExtractor;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -205,8 +206,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
   public void customize(
       final PipeParameters parameters, final PipeExtractorRuntimeConfiguration 
configuration)
       throws Exception {
-    final PipeTaskSourceRuntimeEnvironment environment =
-        (PipeTaskSourceRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
+    final PipeRuntimeEnvironment environment = 
configuration.getRuntimeEnvironment();
 
     final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
         
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters);
@@ -215,7 +215,9 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
 
     pipeName = environment.getPipeName();
     dataRegionId = environment.getRegionId();
-    pipeTaskMeta = environment.getPipeTaskMeta();
+    if (environment instanceof PipeTaskSourceRuntimeEnvironment) {
+      pipeTaskMeta = ((PipeTaskSourceRuntimeEnvironment) 
environment).getPipeTaskMeta();
+    }
 
     // Metrics related to TsFileEpoch are managed in PipeExtractorMetrics. 
These metrics are
     // indexed by the taskID of IoTDBDataRegionExtractor. To avoid 
PipeRealtimeDataRegionExtractor
@@ -302,7 +304,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
 
     if (LOGGER.isInfoEnabled()) {
       LOGGER.info(
-          "Pipe {}@{}: realtime data region extractor is initialized with 
parameters: {}.",
+          "Pipe {}@{}: realtime data region source is initialized with 
parameters: {}.",
           pipeName,
           dataRegionId,
           parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
index b70ff4b7d8d..f743e1dc7cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
@@ -42,6 +42,9 @@ import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEve
 import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionSourceMetrics;
 import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementToBatchVisitor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -58,12 +61,16 @@ import org.apache.iotdb.pipe.api.annotation.TreeModel;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.time.ZoneId;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -107,7 +114,7 @@ public class IoTDBSchemaRegionSource extends 
IoTDBNonDataRegionSource {
         .getSchemaRegionConsensusProtocolClass()
         .equals(ConsensusFactory.SIMPLE_CONSENSUS)) {
       throw new PipeException(
-          "IoTDBSchemaRegionExtractor does not transferring events under 
simple consensus");
+          "IoTDBSchemaRegionSource does not support transferring events under 
simple consensus");
     }
 
     super.customize(parameters, configuration);
@@ -117,7 +124,9 @@ public class IoTDBSchemaRegionSource extends 
IoTDBNonDataRegionSource {
     treePrivilegeParseVisitor = new 
PipePlanTreePrivilegeParseVisitor(skipIfNoPrivileges);
 
     PipeSchemaRegionSourceMetrics.getInstance().register(this);
-    PipeDataNodeSinglePipeMetrics.getInstance().register(this);
+    if (regionId >= 0) {
+      PipeDataNodeSinglePipeMetrics.getInstance().register(this);
+    }
   }
 
   @Override
@@ -146,6 +155,25 @@ public class IoTDBSchemaRegionSource extends 
IoTDBNonDataRegionSource {
     super.start();
   }
 
+  @Override
+  protected void login(final @Nonnull String password) {
+    if (SessionManager.getInstance()
+            .login(
+                new InternalClientSession("Source_login_session_" + regionId),
+                userName,
+                password,
+                ZoneId.systemDefault().toString(),
+                SessionManager.CURRENT_RPC_VERSION,
+                IoTDBConstant.ClientVersion.V_1_0,
+                IClientSession.SqlDialect.TREE,
+                regionId >= 0)
+            .getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipePasswordCheckException(
+          String.format("Failed to check password for pipe %s.", pipeName));
+    }
+  }
+
   @Override
   protected boolean needTransferSnapshot() {
     // Note: the schema region will transfer snapshot if there are table or 
tree planNode captured.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/SchemaRegionListeningFilter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/SchemaRegionListeningFilter.java
index ad37f27ef95..b0501d00ae6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/SchemaRegionListeningFilter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/SchemaRegionListeningFilter.java
@@ -121,7 +121,12 @@ public class SchemaRegionListeningFilter {
                 .getDatabaseFullPath());
     return (TreePattern.isTreeModelDataAllowToBeCaptured(parameters) && 
!isTableModel
             || TablePattern.isTableModelDataAllowToBeCaptured(parameters) && 
isTableModel)
-        && !parseListeningPlanTypeSet(parameters).isEmpty();
+        && shouldSchemaRegionBeListened(parameters);
+  }
+
+  public static boolean shouldSchemaRegionBeListened(final PipeParameters 
parameters)
+      throws IllegalPathException {
+    return !parseListeningPlanTypeSet(parameters).isEmpty();
   }
 
   public static Set<PlanNodeType> parseListeningPlanTypeSet(final 
PipeParameters parameters)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java
index 975cbb09437..d6cdfa63a32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java
@@ -42,7 +42,7 @@ public class InternalClientSession extends IClientSession {
 
   @Override
   public String getClientAddress() {
-    return clientID;
+    return "";
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index ee00655211f..ac7cc236d9f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -130,11 +130,26 @@ public class SessionManager implements 
SessionManagerMBean {
       TSProtocolVersion tsProtocolVersion,
       IoTDBConstant.ClientVersion clientVersion,
       IClientSession.SqlDialect sqlDialect) {
-    BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp();
-
-    long userId = AuthorityChecker.getUserId(username).orElse(-1L);
+    return login(
+        session, username, password, zoneId, tsProtocolVersion, clientVersion, 
sqlDialect, false);
+  }
 
-    Long timeToExpire = DataNodeAuthUtils.checkPasswordExpiration(userId, 
password);
+  // Only pipe can set useEncryptedPassword to true
+  public BasicOpenSessionResp login(
+      final IClientSession session,
+      final String username,
+      final String password,
+      final String zoneId,
+      final TSProtocolVersion tsProtocolVersion,
+      final IoTDBConstant.ClientVersion clientVersion,
+      final IClientSession.SqlDialect sqlDialect,
+      final boolean useEncryptedPassword) {
+    final BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp();
+
+    final long userId = AuthorityChecker.getUserId(username).orElse(-1L);
+
+    Long timeToExpire =
+        DataNodeAuthUtils.checkPasswordExpiration(userId, password, 
useEncryptedPassword);
     if (timeToExpire != null && timeToExpire <= System.currentTimeMillis()) {
       openSessionResp
           .sessionId(-1)
@@ -154,7 +169,8 @@ public class SessionManager implements SessionManagerMBean {
       return openSessionResp;
     }
 
-    TSStatus loginStatus = AuthorityChecker.checkUser(username, password);
+    final TSStatus loginStatus =
+        AuthorityChecker.checkUser(username, password, useEncryptedPassword);
     if (loginStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       // check the version compatibility
       if (!tsProtocolVersion.equals(CURRENT_RPC_VERSION)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
index 60deee79509..1ea10ef37e8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
@@ -683,16 +683,15 @@ public class TreeConfigTaskVisitor extends 
StatementVisitor<IConfigTask, MPPQuer
     alterPipeStatement.setUserName(userName);
 
     final String pipeName = alterPipeStatement.getPipeName();
-    final Map<String, String> extractorAttributes = 
alterPipeStatement.getSourceAttributes();
+    final Map<String, String> sourceAttributes = 
alterPipeStatement.getSourceAttributes();
 
     // If the source is replaced, sql-dialect uses the current Alter Pipe 
sql-dialect. If it is
     // modified, the original sql-dialect is used.
     if (alterPipeStatement.isReplaceAllSourceAttributes()) {
-      extractorAttributes.put(
-          SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE);
+      sourceAttributes.put(SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE);
       checkAndEnrichSourceUser(
           pipeName,
-          extractorAttributes,
+          sourceAttributes,
           new UserEntity(context.getUserId(), context.getUsername(), 
context.getCliHostname()),
           true);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index cb45757d3be..404990e124d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -328,6 +328,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.Compacti
 import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
 import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -372,6 +373,7 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -994,7 +996,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       future.setException(
           new IoTDBException(
               String.format("Failed to create pipe plugin %s. " + pathError, 
pluginName),
-              TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode()));
+              TSStatusCode.SEMANTIC_ERROR.getStatusCode()));
       return future;
     }
 
@@ -1002,7 +1004,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       future.setException(
           new IoTDBException(
               "Failed to create pipe plugin, because the URI is empty.",
-              TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+              TSStatusCode.SEMANTIC_ERROR.getStatusCode()));
       return future;
     }
 
@@ -1019,7 +1021,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
           future.setException(
               new IoTDBException(
                   "The scheme of URI is not set, please specify the scheme of 
URI.",
-                  TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+                  TSStatusCode.SEMANTIC_ERROR.getStatusCode()));
           return future;
         }
         if (!uri.getScheme().equals("file")) {
@@ -1059,7 +1061,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                 "Failed to get executable for PipePlugin "
                     + createPipePluginStatement.getPluginName()
                     + ", please check the URI.",
-                TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+                TSStatusCode.SEMANTIC_ERROR.getStatusCode()));
         return future;
       }
 
@@ -2175,7 +2177,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       future.setException(
           new IoTDBException(
               String.format("Failed to create pipe %s, " + pathError, 
pipeName),
-              TSStatusCode.PIPE_ERROR.getStatusCode()));
+              TSStatusCode.SEMANTIC_ERROR.getStatusCode()));
       return future;
     }
 
@@ -2189,7 +2191,11 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
               createPipeStatement.getSinkAttributes());
     } catch (final Exception e) {
       future.setException(
-          new IoTDBException(e.getMessage(), 
TSStatusCode.PIPE_ERROR.getStatusCode()));
+          new IoTDBException(
+              e.getMessage(),
+              e instanceof PipePasswordCheckException
+                  ? TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode()
+                  : TSStatusCode.PIPE_ERROR.getStatusCode()));
       return future;
     }
 
@@ -2353,6 +2359,8 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       return future;
     }
 
+    boolean hasSourcePassword = false;
+    boolean hasSinkPassword = false;
     // Construct temporary pipe static meta for validation
     final String pipeName = alterPipeStatement.getPipeName();
     final Map<String, String> sourceAttributes;
@@ -2381,9 +2389,12 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
           sourceAttributes =
               
pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute();
           if (onlyContainsUser) {
-            checkSourceType(alterPipeStatement.getPipeName(), 
sourceAttributes);
+            checkSourceTypeWithException(alterPipeStatement.getPipeName(), 
sourceAttributes);
           }
         }
+        hasSourcePassword =
+            !checkSourceType(sourceAttributes)
+                || containsPassword(alterPipeStatement.getSourceAttributes());
       } else {
         sourceAttributes =
             
pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute();
@@ -2407,6 +2418,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       }
 
       if (!alterPipeStatement.getSinkAttributes().isEmpty()) {
+        // Do not remove for handshake
         if (alterPipeStatement.isReplaceAllSinkAttributes()) {
           sinkAttributes = alterPipeStatement.getSinkAttributes();
         } else {
@@ -2419,18 +2431,39 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
           sinkAttributes =
               
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
           if (onlyContainsUser) {
-            checkSinkType(alterPipeStatement.getPipeName(), sinkAttributes);
+            checkSinkTypeWithException(alterPipeStatement.getPipeName(), 
sinkAttributes);
           }
         }
+        hasSinkPassword =
+            !checkSinkType(sinkAttributes)
+                || containsPassword(alterPipeStatement.getSinkAttributes());
       } else {
         sinkAttributes = 
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
       }
 
+      final Map<String, String> checkedSource = new 
HashMap<>(sourceAttributes);
+      if (!hasSourcePassword) {
+        checkedSource.remove(PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY);
+        checkedSource.remove(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY);
+        checkedSource.remove(
+            
PipeParameters.KeyReducer.reduce(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+      }
+      final Map<String, String> checkedSink = new HashMap<>(sinkAttributes);
+      if (!hasSinkPassword) {
+        checkedSink.remove(PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY);
+        checkedSink.remove(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY);
+        checkedSink.remove(
+            
PipeParameters.KeyReducer.reduce(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY));
+      }
       PipeDataNodeAgent.plugin()
-          .validate(pipeName, sourceAttributes, processorAttributes, 
sinkAttributes);
+          .validate(pipeName, checkedSource, processorAttributes, checkedSink);
     } catch (final Exception e) {
       future.setException(
-          new IoTDBException(e.getMessage(), 
TSStatusCode.PIPE_ERROR.getStatusCode()));
+          new IoTDBException(
+              e.getMessage(),
+              e instanceof PipePasswordCheckException
+                  ? TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode()
+                  : TSStatusCode.PIPE_ERROR.getStatusCode()));
       return future;
     }
 
@@ -2481,56 +2514,68 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     }
   }
 
-  private static void checkSourceType(
-      final String pipeName, final Map<String, String> 
replacedExtractorAttributes) {
-    final PipeParameters extractorParameters = new 
PipeParameters(replacedExtractorAttributes);
+  private static void checkSourceTypeWithException(
+      final String pipeName, final Map<String, String> 
replacedSourceAttributes) {
+    if (checkSourceType(replacedSourceAttributes)) {
+      throw new SemanticException(
+          String.format(
+              "Failed to alter pipe %s, in iotdb-source, password must be set 
when the username is specified.",
+              pipeName));
+    }
+  }
+
+  private static boolean checkSourceType(final Map<String, String> 
replacedSourceAttributes) {
+    final PipeParameters sourceParameters = new 
PipeParameters(replacedSourceAttributes);
     final String pluginName =
-        extractorParameters
+        sourceParameters
             .getStringOrDefault(
                 Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, 
PipeSourceConstant.SOURCE_KEY),
                 BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
             .toLowerCase();
 
-    if 
(pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
-        || 
pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())) {
-      throw new SemanticException(
-          String.format(
-              "Failed to alter pipe %s, in iotdb-source, password must be set 
when the username is specified.",
-              pipeName));
-    }
+    return 
pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+        || 
pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName());
+  }
+
+  private static boolean containsPassword(final Map<String, String> 
sourceOrSinkAttributes) {
+    final PipeParameters sourceOrSinkParameters = new 
PipeParameters(sourceOrSinkAttributes);
+    return sourceOrSinkParameters.hasAnyAttributes(
+        PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY, 
PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY);
   }
 
-  private static boolean onlyContainsUser(
-      final Map<String, String> extractorOrConnectorAttributes) {
-    final PipeParameters extractorOrConnectorParameters =
-        new PipeParameters(extractorOrConnectorAttributes);
-    return extractorOrConnectorParameters.hasAnyAttributes(
+  private static boolean onlyContainsUser(final Map<String, String> 
sourceOrSinkAttributes) {
+    final PipeParameters sourceOrSinkParameters = new 
PipeParameters(sourceOrSinkAttributes);
+    return sourceOrSinkParameters.hasAnyAttributes(
             PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY,
             PipeSinkConstant.SINK_IOTDB_USER_KEY,
             PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY,
             PipeSinkConstant.SINK_IOTDB_USERNAME_KEY)
-        && !extractorOrConnectorParameters.hasAnyAttributes(
+        && !sourceOrSinkParameters.hasAnyAttributes(
             PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY,
             PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY);
   }
 
-  private static void checkSinkType(
-      final String pipeName, final Map<String, String> connectorAttributes) {
-    final PipeParameters connectorParameters = new 
PipeParameters(connectorAttributes);
+  private static void checkSinkTypeWithException(
+      final String pipeName, final Map<String, String> sinkAttributes) {
+    if (checkSinkType(sinkAttributes)) {
+      throw new SemanticException(
+          String.format(
+              "Failed to alter pipe %s, in write-back-sink, password must be 
set when the username is specified.",
+              pipeName));
+    }
+  }
+
+  private static boolean checkSinkType(final Map<String, String> 
sinkAttributes) {
+    final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
     final String pluginName =
-        connectorParameters
+        sinkParameters
             .getStringOrDefault(
                 Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
                 BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
             .toLowerCase();
 
-    if 
(pluginName.equals(BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName())
-        || 
pluginName.equals(BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName())) {
-      throw new SemanticException(
-          String.format(
-              "Failed to alter pipe %s, in write-back-sink, password must be 
set when the username is specified.",
-              pipeName));
-    }
+    return 
pluginName.equals(BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName())
+        || 
pluginName.equals(BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
index aecadff7f81..9b5857d2635 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
@@ -75,25 +75,25 @@ public class AlterPipeTask implements IConfigTask {
             
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
 
     // support now() function
-    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+    PipeFunctionSupport.applyNowFunction2SourceAttributes(
         attributes,
         PipeSourceConstant.SOURCE_START_TIME_KEY,
         PipeSourceConstant.EXTRACTOR_START_TIME_KEY,
         currentTime);
 
-    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+    PipeFunctionSupport.applyNowFunction2SourceAttributes(
         attributes,
         PipeSourceConstant.SOURCE_END_TIME_KEY,
         PipeSourceConstant.EXTRACTOR_END_TIME_KEY,
         currentTime);
 
-    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+    PipeFunctionSupport.applyNowFunction2SourceAttributes(
         attributes,
         PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY,
         PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY,
         currentTime);
 
-    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+    PipeFunctionSupport.applyNowFunction2SourceAttributes(
         attributes,
         PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY,
         PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
index a5fbd36f88d..fc3884cb27e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
@@ -37,19 +37,19 @@ public class CreatePipeTask implements IConfigTask {
 
   private final CreatePipeStatement createPipeStatement;
 
-  public CreatePipeTask(CreatePipeStatement createPipeStatement) {
+  public CreatePipeTask(final CreatePipeStatement createPipeStatement) {
     // support now() function
-    
applyNowFunctionToExtractorAttributes(createPipeStatement.getSourceAttributes());
+    
applyNowFunction2SourceAttributes(createPipeStatement.getSourceAttributes());
     this.createPipeStatement = createPipeStatement;
   }
 
-  public CreatePipeTask(CreatePipe createPipe) {
+  public CreatePipeTask(final CreatePipe createPipe) {
     createPipeStatement = new CreatePipeStatement(StatementType.CREATE_PIPE);
     createPipeStatement.setPipeName(createPipe.getPipeName());
     createPipeStatement.setIfNotExists(createPipe.hasIfNotExistsCondition());
 
     // support now() function
-    applyNowFunctionToExtractorAttributes(createPipe.getSourceAttributes());
+    applyNowFunction2SourceAttributes(createPipe.getSourceAttributes());
 
     createPipeStatement.setSourceAttributes(createPipe.getSourceAttributes());
     
createPipeStatement.setProcessorAttributes(createPipe.getProcessorAttributes());
@@ -57,37 +57,37 @@ public class CreatePipeTask implements IConfigTask {
   }
 
   @Override
-  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
+  public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
     return configTaskExecutor.createPipe(createPipeStatement);
   }
 
-  private void applyNowFunctionToExtractorAttributes(final Map<String, String> 
attributes) {
+  private void applyNowFunction2SourceAttributes(final Map<String, String> 
attributes) {
     final long currentTime =
         CommonDateTimeUtils.convertMilliTimeWithPrecision(
             System.currentTimeMillis(),
             
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
 
     // support now() function
-    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+    PipeFunctionSupport.applyNowFunction2SourceAttributes(
         attributes,
         PipeSourceConstant.SOURCE_START_TIME_KEY,
         PipeSourceConstant.EXTRACTOR_START_TIME_KEY,
         currentTime);
 
-    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+    PipeFunctionSupport.applyNowFunction2SourceAttributes(
         attributes,
         PipeSourceConstant.SOURCE_END_TIME_KEY,
         PipeSourceConstant.EXTRACTOR_END_TIME_KEY,
         currentTime);
 
-    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+    PipeFunctionSupport.applyNowFunction2SourceAttributes(
         attributes,
         PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY,
         PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY,
         currentTime);
 
-    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+    PipeFunctionSupport.applyNowFunction2SourceAttributes(
         attributes,
         PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY,
         PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
index 0edc18c3f00..e9fb09e8750 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
@@ -31,7 +31,7 @@ public class PipeFunctionSupport {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeFunctionSupport.class);
 
-  public static void applyNowFunctionToExtractorAttributes(
+  public static void applyNowFunction2SourceAttributes(
       final Map<String, String> extractorAttributes,
       final String sourceKey,
       final String extractorKey,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java
index 187575c58da..6ee378282ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java
@@ -275,7 +275,8 @@ public class DataNodeAuthUtils {
    * @return the timestamp when the password will expire. Long.MAX if the 
password never expires.
    *     Null if the password history cannot be found.
    */
-  public static Long checkPasswordExpiration(long userId, String password) {
+  public static Long checkPasswordExpiration(
+      final long userId, final String password, final boolean 
useEncryptedPassword) {
     if (userId == -1) {
       return null;
     }
@@ -335,7 +336,8 @@ public class DataNodeAuthUtils {
             
CommonDateTimeUtils.convertIoTDBTimeToMillis(tsBlock.getTimeByIndex(0));
         // columns of last query: [timeseriesName, value, dataType]
         String oldPassword = tsBlock.getColumn(1).getBinary(0).toString();
-        if (oldPassword.equals(AuthUtils.encryptPassword(password))) {
+        if (oldPassword.equals(
+            useEncryptedPassword ? password : 
AuthUtils.encryptPassword(password))) {
           if (lastPasswordTime + passwordExpirationDays * 1000 * 86400 <= 
lastPasswordTime) {
             // overflow or passwordExpirationDays <= 0
             return Long.MAX_VALUE;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java
index 9d0acaf14d8..194f33e8d67 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java
@@ -64,8 +64,8 @@ public class LocalFileAuthorizerTest {
 
   @Test
   public void testLogin() throws AuthException {
-    Assert.assertTrue(authorizer.login("root", "root"));
-    Assert.assertThrows(AuthException.class, () -> authorizer.login("root", 
"error"));
+    Assert.assertTrue(authorizer.login("root", "root", false));
+    Assert.assertThrows(AuthException.class, () -> authorizer.login("root", 
"error", false));
   }
 
   @Test
@@ -76,7 +76,7 @@ public class LocalFileAuthorizerTest {
     } catch (AuthException e) {
       assertEquals("User user already exists", e.getMessage());
     }
-    Assert.assertTrue(authorizer.login(userName, password));
+    Assert.assertTrue(authorizer.login(userName, password, false));
     authorizer.deleteUser(userName);
     try {
       authorizer.deleteUser(userName);
@@ -230,7 +230,7 @@ public class LocalFileAuthorizerTest {
   public void testUpdatePassword() throws AuthException {
     authorizer.createUser(userName, password);
     authorizer.updateUserPassword(userName, "newPassword123456");
-    Assert.assertTrue(authorizer.login(userName, "newPassword123456"));
+    Assert.assertTrue(authorizer.login(userName, "newPassword123456", false));
   }
 
   @Test
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java
index bd0c2d399e5..196cc80e5b6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java
@@ -60,7 +60,7 @@ public class OpenIdAuthorizerTest {
         
"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJxMS1XbTBvelE1TzBtUUg4LVNKYXAyWmNONE1tdWNXd25RV0tZeFpKNG93In0.eyJleHAiOjE1OTAzMTcxNzYsImlhdCI6MTU5MDMxNjg3NiwianRpIjoiY2MyNWQ3MDAtYjc5NC00OTA4LTg0OGUtOTRhNzYzNmM5YzQxIiwiaXNzIjoiaHR0cDovL2F1dGguZGVtby5wcmFnbWF0aWNpbmR1c3RyaWVzLmRlL2F1dGgvcmVhbG1zL0lvVERCIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6Ijg2YWRmNGIzLWE4ZTUtNDc1NC1iNWEwLTQ4OGI0OWY0M2VkMiIsInR5cCI6IkJlYXJlciIsImF6cCI6ImlvdGRiIiwic2Vzc2lvbl9zdGF0ZSI6Ijk0ZmI5NGZjLTg3YTMtNDg4Ny04M2Q3LWE
 [...]
 
     OpenIdAuthorizer authorizer = new 
OpenIdAuthorizer(JSONObjectUtils.parse(OPEN_ID_PUBLIC_JWK));
-    boolean login = authorizer.login(jwt, null);
+    boolean login = authorizer.login(jwt, null, false);
 
     assertTrue(login);
   }
@@ -99,14 +99,16 @@ public class OpenIdAuthorizerTest {
     boolean login =
         openIdAuthorizer.login(
             
"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJxMS1XbTBvelE1TzBtUUg4LVNKYXAyWmNONE1tdWNXd25RV0tZeFpKNG93In0.eyJleHAiOjE1OTAzMTcxNzYsImlhdCI6MTU5MDMxNjg3NiwianRpIjoiY2MyNWQ3MDAtYjc5NC00OTA4LTg0OGUtOTRhNzYzNmM5YzQxIiwiaXNzIjoiaHR0cDovL2F1dGguZGVtby5wcmFnbWF0aWNpbmR1c3RyaWVzLmRlL2F1dGgvcmVhbG1zL0lvVERCIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6Ijg2YWRmNGIzLWE4ZTUtNDc1NC1iNWEwLTQ4OGI0OWY0M2VkMiIsInR5cCI6IkJlYXJlciIsImF6cCI6ImlvdGRiIiwic2Vzc2lvbl9zdGF0ZSI6Ijk0ZmI5NGZjLTg3YTMtNDg4Ny04M2Q
 [...]
-            "");
+            "",
+            false);
     assertTrue(login);
     
config.setOpenIdProviderUrl("https://auth.demo.pragmaticindustries.de/auth/realms/IoTDB/";);
     OpenIdAuthorizer openIdAuthorizer1 = new OpenIdAuthorizer();
     login =
         openIdAuthorizer1.login(
             
"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJxMS1XbTBvelE1TzBtUUg4LVNKYXAyWmNONE1tdWNXd25RV0tZeFpKNG93In0.eyJleHAiOjE1OTAzMTcxNzYsImlhdCI6MTU5MDMxNjg3NiwianRpIjoiY2MyNWQ3MDAtYjc5NC00OTA4LTg0OGUtOTRhNzYzNmM5YzQxIiwiaXNzIjoiaHR0cDovL2F1dGguZGVtby5wcmFnbWF0aWNpbmR1c3RyaWVzLmRlL2F1dGgvcmVhbG1zL0lvVERCIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6Ijg2YWRmNGIzLWE4ZTUtNDc1NC1iNWEwLTQ4OGI0OWY0M2VkMiIsInR5cCI6IkJlYXJlciIsImF6cCI6ImlvdGRiIiwic2Vzc2lvbl9zdGF0ZSI6Ijk0ZmI5NGZjLTg3YTMtNDg4Ny04M2Q
 [...]
-            "");
+            "",
+            false);
     assertTrue(login);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java
index 98f70dc0ee6..6ba8c5336b1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java
@@ -107,12 +107,17 @@ public abstract class BasicAuthorizer implements 
IAuthorizer, IService {
   }
 
   @Override
-  public boolean login(String username, String password) throws AuthException {
+  public boolean login(
+      final String username, final String password, final boolean 
useEncryptedPassword)
+      throws AuthException {
     User user = userManager.getEntity(username);
     if (user == null || password == null) {
       throw new AuthException(
           TSStatusCode.USER_NOT_EXIST, String.format("The user %s does not 
exist.", username));
     }
+    if (useEncryptedPassword) {
+      return password.equals(user.getPassword());
+    }
     if (AuthUtils.validatePassword(
         password, user.getPassword(), 
AsymmetricEncrypt.DigestAlgorithm.SHA_256)) {
       return true;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java
index 445b29c0790..3ac33dbeddd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java
@@ -42,7 +42,8 @@ public interface IAuthorizer extends SnapshotProcessor {
    * @param password The password of the user.
    * @return True if such user exists and the given password is correct, else 
return false.
    */
-  boolean login(String username, String password) throws AuthException;
+  boolean login(String username, String password, final boolean 
useEncryptedPassword)
+      throws AuthException;
 
   /**
    * Login for a user in pipe.
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java
index 2da1acfaee9..ee66ee5bced 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java
@@ -145,7 +145,8 @@ public class OpenIdAuthorizer extends BasicAuthorizer {
   }
 
   @Override
-  public boolean login(String token, String password) throws AuthException {
+  public boolean login(String token, String password, final boolean 
useEncryptedPassword)
+      throws AuthException {
     if (password != null && !password.isEmpty()) {
       logger.error(
           "JWT Login failed as a non-empty Password was given username 
(token): {}", token);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index 519d263449b..45a1acbfed3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -87,28 +87,31 @@ public abstract class PipePluginAgent {
       final Map<String, String> processorAttributes,
       final Map<String, String> sinkAttributes)
       throws Exception {
-    validateSource(sourceAttributes);
+    validateSource(pipeName, sourceAttributes);
     validateProcessor(processorAttributes);
     validateSink(pipeName, sinkAttributes);
   }
 
-  protected PipeExtractor validateSource(final Map<String, String> 
sourceAttributes)
-      throws Exception {
+  protected PipeExtractor validateSource(
+      final String pipeName, final Map<String, String> sourceAttributes) 
throws Exception {
     final PipeParameters sourceParameters = new 
PipeParameters(sourceAttributes);
-    final PipeExtractor temporaryExtractor = reflectSource(sourceParameters);
+    final PipeExtractor temporarySource = reflectSource(sourceParameters);
     try {
-      temporaryExtractor.validate(new 
PipeParameterValidator(sourceParameters));
+      temporarySource.validate(new PipeParameterValidator(sourceParameters));
+      temporarySource.customize(
+          sourceParameters,
+          new PipeTaskRuntimeConfiguration(new 
PipeTaskTemporaryRuntimeEnvironment(pipeName)));
     } finally {
       try {
-        temporaryExtractor.close();
-      } catch (Exception e) {
+        temporarySource.close();
+      } catch (final Exception e) {
         LOGGER.warn("Failed to close temporary source: {}", e.getMessage(), e);
       }
     }
-    return temporaryExtractor;
+    return temporarySource;
   }
 
-  protected PipeProcessor validateProcessor(Map<String, String> 
processorAttributes)
+  protected PipeProcessor validateProcessor(final Map<String, String> 
processorAttributes)
       throws Exception {
     final PipeParameters processorParameters = new 
PipeParameters(processorAttributes);
     final PipeProcessor temporaryProcessor = 
reflectProcessor(processorParameters);
@@ -117,15 +120,15 @@ public abstract class PipePluginAgent {
     } finally {
       try {
         temporaryProcessor.close();
-      } catch (Exception e) {
+      } catch (final Exception e) {
         LOGGER.warn("Failed to close temporary processor: {}", e.getMessage(), 
e);
       }
     }
     return temporaryProcessor;
   }
 
-  protected PipeConnector validateSink(String pipeName, Map<String, String> 
sinkAttributes)
-      throws Exception {
+  protected PipeConnector validateSink(
+      final String pipeName, final Map<String, String> sinkAttributes) throws 
Exception {
     final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
     final PipeConnector temporarySink = reflectSink(sinkParameters);
     try {
@@ -137,7 +140,7 @@ public abstract class PipePluginAgent {
     } finally {
       try {
         temporarySink.close();
-      } catch (Exception e) {
+      } catch (final Exception e) {
         LOGGER.warn("Failed to close temporary connector: {}", e.getMessage(), 
e);
       }
     }
@@ -154,7 +157,7 @@ public abstract class PipePluginAgent {
    * @throws PipeException if any exception occurs
    */
   public final List<String> getSubProcessorNamesWithSpecifiedParent(
-      Class<? extends PipeProcessor> parentClass) throws PipeException {
+      final Class<? extends PipeProcessor> parentClass) throws PipeException {
     return 
StreamSupport.stream(pipePluginMetaKeeper.getAllPipePluginMeta().spliterator(), 
false)
         .map(pipePluginMeta -> pipePluginMeta.getPluginName().toLowerCase())
         .filter(
@@ -162,7 +165,7 @@ public abstract class PipePluginAgent {
               try (PipeProcessor processor =
                   (PipeProcessor) 
pipeProcessorConstructor.reflectPluginByKey(pluginName)) {
                 return processor.getClass().getSuperclass() == parentClass;
-              } catch (Exception e) {
+              } catch (final Exception e) {
                 return false;
               }
             })
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
index f16c6387cf5..e0868156ca9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
@@ -98,7 +98,7 @@ public abstract class PipeSubtaskExecutor {
 
   public final synchronized void register(final PipeSubtask subtask) {
     if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) {
-      LOGGER.warn("The subtask {} is already registered.", 
subtask.getTaskID());
+      LOGGER.warn("The subtask {} is already registered.", 
getSafeSubtaskStr(subtask.getTaskID()));
       return;
     }
 
@@ -107,32 +107,36 @@ public abstract class PipeSubtaskExecutor {
         subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor, 
schedulerSupplier(this));
   }
 
+  private static String getSafeSubtaskStr(final String subtaskID) {
+    return subtaskID.replaceAll("password=[^,}]*", "password=******");
+  }
+
   protected PipeSubtaskScheduler schedulerSupplier(final PipeSubtaskExecutor 
executor) {
     return new PipeSubtaskScheduler(executor);
   }
 
   public final synchronized void start(final String subTaskID) {
     if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
-      LOGGER.warn("The subtask {} is not registered.", subTaskID);
+      LOGGER.warn("The subtask {} is not registered.", 
getSafeSubtaskStr(subTaskID));
       return;
     }
 
     final PipeSubtask subtask = registeredIdSubtaskMapper.get(subTaskID);
     if (subtask.isSubmittingSelf()) {
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("The subtask {} is already running.", subTaskID);
+        LOGGER.debug("The subtask {} is already running.", 
getSafeSubtaskStr(subTaskID));
       }
     } else {
       subtask.allowSubmittingSelf();
       subtask.submitSelf();
       ++runningSubtaskNumber;
-      LOGGER.info("The subtask {} is started to submit self.", subTaskID);
+      LOGGER.info("The subtask {} is started to submit self.", 
getSafeSubtaskStr(subTaskID));
     }
   }
 
   public final synchronized void stop(final String subTaskID) {
     if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
-      LOGGER.warn("The subtask {} is not registered.", subTaskID);
+      LOGGER.warn("The subtask {} is not registered.", 
getSafeSubtaskStr(subTaskID));
       return;
     }
 
@@ -149,9 +153,9 @@ public abstract class PipeSubtaskExecutor {
     if (subtask != null) {
       try {
         subtask.close();
-        LOGGER.info("The subtask {} is closed successfully.", subTaskID);
+        LOGGER.info("The subtask {} is closed successfully.", 
getSafeSubtaskStr(subTaskID));
       } catch (final Exception e) {
-        LOGGER.error("Failed to close the subtask {}.", subTaskID, e);
+        LOGGER.error("Failed to close the subtask {}.", 
getSafeSubtaskStr(subTaskID), e);
       }
     }
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
index 095bd243073..04cf7f27bd2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
@@ -291,6 +291,9 @@ public abstract class IoTDBNonDataRegionSource extends 
IoTDBSource {
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
 
   public long getUnTransferredEventCount() {
+    if (Objects.isNull(pipeTaskMeta)) {
+      return 0L;
+    }
     return !(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)
         ? getListeningQueue().getTailIndex()
             - ((MetaProgressIndex) pipeTaskMeta.getProgressIndex()).getIndex()
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
index 91a149c9f66..33acc06123e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
@@ -30,20 +30,25 @@ import org.apache.iotdb.pipe.api.PipeExtractor;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
+import javax.annotation.Nonnull;
+
 import java.util.Arrays;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_SKIP_IF_NO_PRIVILEGES;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_SKIP_IF_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_IOTDB_USER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_SKIP_IF_KEY;
@@ -154,13 +159,14 @@ public abstract class IoTDBSource implements 
PipeExtractor {
   public void customize(
       final PipeParameters parameters, final PipeExtractorRuntimeConfiguration 
configuration)
       throws Exception {
-    final PipeTaskSourceRuntimeEnvironment environment =
-        ((PipeTaskSourceRuntimeEnvironment) 
configuration.getRuntimeEnvironment());
+    final PipeRuntimeEnvironment environment = 
configuration.getRuntimeEnvironment();
     regionId = environment.getRegionId();
     pipeName = environment.getPipeName();
     creationTime = environment.getCreationTime();
     taskID = pipeName + "_" + regionId + "_" + creationTime;
-    pipeTaskMeta = environment.getPipeTaskMeta();
+    if (environment instanceof PipeTaskSourceRuntimeEnvironment) {
+      pipeTaskMeta = ((PipeTaskSourceRuntimeEnvironment) 
environment).getPipeTaskMeta();
+    }
 
     final boolean isDoubleLiving =
         parameters.getBooleanOrDefault(
@@ -199,8 +205,15 @@ public abstract class IoTDBSource implements PipeExtractor 
{
     userEntity.setAuditLogOperation(AuditLogOperation.QUERY);
 
     skipIfNoPrivileges = getSkipIfNoPrivileges(parameters);
+    final String password =
+        parameters.getStringByKeys(EXTRACTOR_IOTDB_PASSWORD_KEY, 
SOURCE_IOTDB_PASSWORD_KEY);
+    if (Objects.nonNull(password)) {
+      login(password);
+    }
   }
 
+  protected abstract void login(final @Nonnull String password);
+
   public static boolean getSkipIfNoPrivileges(final PipeParameters 
extractorParameters) {
     final String extractorSkipIfValue =
         extractorParameters
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index b6569d943fb..92312ee81a3 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -445,6 +445,7 @@ struct TAuthizedPatternTreeResp {
 struct TLoginReq {
   1: required string userrname
   2: required string password
+  3: optional bool useEncryptedPassword
 }
 
 // reqtype : tree, relational, system

Reply via email to