This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 85ecbd60826 Pipe: Implemented the login check in customize method &
Added mosaic to subtask string in logger & Optimized the error code of illegal
pipe / pipe plugin name string (#17197)
85ecbd60826 is described below
commit 85ecbd608265313eac960520ec46eb03d833b169
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 25 17:46:46 2026 +0800
Pipe: Implemented the login check in customize method & Added mosaic to
subtask string in logger & Optimized the error code of illegal pipe / pipe
plugin name string (#17197)
* wb
* expr
* login
* login
* fix
* fix
* semantic
* mosaic
* fix
* fix
* fix
* fix
* fix
* lo
* if
* fix
* fix
* fix
* fix
* fix
* may-fix
* fix
* part
* fix
* pwce
* fix
* initial
* partial
* bug-fix
* fix
* ..
* st@ong
* fix
* fb
* 701
* bug-fix
* bug-fix
* further
* fix
* fix
---
.../treemodel/auto/basic/IoTDBPipeSyntaxIT.java | 4 +-
.../treemodel/manual/IoTDBPipePermissionIT.java | 128 ++++++++++++++++++++-
.../api/customizer/parameter/PipeParameters.java | 4 +-
.../api/exception/PipePasswordCheckException.java | 26 +++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../iotdb/confignode/manager/ConfigManager.java | 5 +-
.../apache/iotdb/confignode/manager/IManager.java | 3 +-
.../confignode/manager/PermissionManager.java | 5 +-
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 2 +-
.../pipe/source/IoTDBConfigRegionSource.java | 17 +++
.../confignode/persistence/auth/AuthorInfo.java | 5 +-
.../persistence/auth/AuthorPlanExecutor.java | 5 +-
.../persistence/auth/IAuthorPlanExecutor.java | 3 +-
.../impl/pipe/task/AlterPipeProcedureV2.java | 8 +-
.../impl/pipe/task/CreatePipeProcedureV2.java | 8 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 5 +-
.../confignode/persistence/AuthorInfoTest.java | 2 +-
.../pipe/metric/PipeConsensusSyncLagManager.java | 10 +-
.../org/apache/iotdb/db/auth/AuthorityChecker.java | 10 +-
.../iotdb/db/auth/ClusterAuthorityFetcher.java | 26 +++--
.../apache/iotdb/db/auth/IAuthorityFetcher.java | 3 +-
.../pipe/agent/plugin/PipeDataNodePluginAgent.java | 40 ++++---
.../dataregion/PipeDataRegionPluginAgent.java | 22 ++--
.../overview/PipeDataNodeSinglePipeMetrics.java | 16 +--
.../schema/PipeSchemaRegionSourceMetrics.java | 38 +++---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 2 +-
.../sink/protocol/writeback/WriteBackSink.java | 23 ++++
.../source/dataregion/IoTDBDataRegionSource.java | 114 +++++++++++-------
...istoricalDataRegionTsFileAndDeletionSource.java | 17 +--
.../realtime/PipeRealtimeDataRegionSource.java | 10 +-
.../schemaregion/IoTDBSchemaRegionSource.java | 32 +++++-
.../schemaregion/SchemaRegionListeningFilter.java | 7 +-
.../db/protocol/session/InternalClientSession.java | 2 +-
.../iotdb/db/protocol/session/SessionManager.java | 26 ++++-
.../execution/config/TreeConfigTaskVisitor.java | 7 +-
.../config/executor/ClusterConfigTaskExecutor.java | 121 +++++++++++++------
.../execution/config/sys/pipe/AlterPipeTask.java | 8 +-
.../execution/config/sys/pipe/CreatePipeTask.java | 20 ++--
.../config/sys/pipe/PipeFunctionSupport.java | 2 +-
.../apache/iotdb/db/utils/DataNodeAuthUtils.java | 6 +-
.../auth/authorizer/LocalFileAuthorizerTest.java | 8 +-
.../db/auth/authorizer/OpenIdAuthorizerTest.java | 8 +-
.../commons/auth/authorizer/BasicAuthorizer.java | 7 +-
.../iotdb/commons/auth/authorizer/IAuthorizer.java | 3 +-
.../commons/auth/authorizer/OpenIdAuthorizer.java | 3 +-
.../commons/pipe/agent/plugin/PipePluginAgent.java | 33 +++---
.../agent/task/execution/PipeSubtaskExecutor.java | 18 +--
.../pipe/source/IoTDBNonDataRegionSource.java | 3 +
.../iotdb/commons/pipe/source/IoTDBSource.java | 19 ++-
.../src/main/thrift/confignode.thrift | 1 +
50 files changed, 643 insertions(+), 253 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
index 6ae8d56bfaa..87bc7b0e95b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
@@ -396,7 +396,7 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeDualTreeModelAutoIT {
+ "PipePlugin.jar"));
fail();
} catch (final SQLException e) {
- Assert.assertTrue(e.getMessage().contains("1600: Failed to create pipe
plugin"));
+ Assert.assertTrue(e.getMessage().contains("701: Failed to create pipe
plugin"));
}
}
@@ -860,7 +860,7 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeDualTreeModelAutoIT {
fail();
} catch (final SQLException e) {
Assert.assertEquals(
- "1603: Failed to get executable for PipePlugin TestProcessor,
please check the URI.",
+ "701: Failed to get executable for PipePlugin TestProcessor,
please check the URI.",
e.getMessage());
}
try {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index 35403c4dc6d..24807622cf6 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -311,8 +311,7 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualTreeModelManualIT {
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
fail("Shall fail if password is wrong.");
} catch (final SQLException e) {
- Assert.assertTrue(
- e.getMessage().contains("Fail to CREATE_PIPE because Authentication
failed."));
+ Assert.assertEquals("801: Failed to check password for pipe a2b.",
e.getMessage());
}
// Use current session, user is root
@@ -506,4 +505,129 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualTreeModelManualIT {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testIllegalPassword() throws Exception {
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "create user `thulab` 'ST@ongpasswd123456'",
+ "create role `admin`",
+ "grant role `admin` to `thulab`",
+ "grant WRITE, READ, SYSTEM, SECURITY on root.** to role `admin`"),
+ null);
+
+ TestUtils.executeNonQuery(
+ senderEnv,
+ "create aligned timeSeries root.vehicle.plane(temperature DOUBLE,
pressure INT32)");
+ TestUtils.executeNonQuery(
+ receiverEnv,
+ "create aligned timeSeries root.vehicle.plane(temperature DOUBLE,
pressure INT32)");
+
+ Connection connection = senderEnv.getConnection();
+ Statement statement = connection.createStatement();
+ try {
+ statement.execute(
+ String.format(
+ "create pipe a2b"
+ + " with source ("
+ + "'user'='thulab'"
+ + ", 'password'='passwd')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+ fail();
+ } catch (final Exception e) {
+ Assert.assertEquals("801: Failed to check password for pipe a2b.",
e.getMessage());
+ }
+
+ try {
+ statement.execute(
+ "create pipe a2b ('sink'='write-back-sink', 'user'='thulab',
'password'='passwd')");
+ fail();
+ } catch (final Exception e) {
+ Assert.assertEquals("801: Failed to check password for pipe a2b.",
e.getMessage());
+ }
+
+ statement.execute(
+ String.format(
+ "create pipe a2b"
+ + " with source ("
+ + "'user'='thulab'"
+ + ", 'password'='ST@ongpasswd123456')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+
+ TestUtils.executeNonQuery(
+ senderEnv, "insert into root.vehicle.plane(temperature, pressure)
values (36.5, 1103)");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(pressure) from root.vehicle.plane",
+ "count(root.vehicle.plane.pressure),",
+ Collections.singleton("1,"));
+
+ statement.execute("alter user thulab set password 'newST@ongPassword'");
+
+ try {
+ TestUtils.restartCluster(senderEnv);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return;
+ }
+
+ connection = senderEnv.getConnection();
+ statement = connection.createStatement();
+ TestUtils.executeNonQuery(
+ senderEnv, "insert into root.vehicle.plane(temperature, pressure)
values (36.5, 1103)");
+
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv,
+ "select count(pressure) from root.vehicle.plane",
+ "count(root.vehicle.plane.pressure),",
+ Collections.singleton("1,"));
+
+ try {
+ statement.execute("alter pipe a2b modify source ('password'='fake')");
+ } catch (final SQLException e) {
+ Assert.assertEquals("801: Failed to check password for pipe a2b.",
e.getMessage());
+ }
+
+ statement.execute("alter pipe a2b modify source
('password'='newST@ongPassword')");
+
+ // Test empty alter
+ statement.execute("alter pipe a2b");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(pressure) from root.vehicle.plane",
+ "count(root.vehicle.plane.pressure),",
+ Collections.singleton("2,"));
+
+ statement.execute("alter user thulab set password
'anotherST@ongPassword'");
+
+ try {
+ TestUtils.restartCluster(senderEnv);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return;
+ }
+
+ connection = senderEnv.getConnection();
+ statement = connection.createStatement();
+ TestUtils.executeNonQuery(
+ senderEnv, "insert into root.vehicle.plane(temperature, pressure)
values (36.5, 1103)");
+ statement.execute("alter user thulab set password 'newST@ongPassword'");
+ statement.execute("alter pipe a2b");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(pressure) from root.vehicle.plane",
+ "count(root.vehicle.plane.pressure),",
+ Collections.singleton("3,"));
+
+ statement.close();
+ connection.close();
+ }
}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index c4e7edd63d0..c13f87ae579 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -371,7 +371,7 @@ public class PipeParameters {
return new PipeParameters(thisMap);
}
- private static class KeyReducer {
+ public static class KeyReducer {
private static final Set<String> FIRST_PREFIXES = new HashSet<>();
private static final Set<String> SECOND_PREFIXES = new HashSet<>();
@@ -399,7 +399,7 @@ public class PipeParameters {
return key;
}
- static String reduce(String key) {
+ public static String reduce(String key) {
if (key == null) {
return null;
}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipePasswordCheckException.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipePasswordCheckException.java
new file mode 100644
index 00000000000..10dce32c591
--- /dev/null
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipePasswordCheckException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api.exception;
+
+public class PipePasswordCheckException extends PipeException {
+ public PipePasswordCheckException(final String message) {
+ super(message);
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 019e643fa33..6af20dc2f53 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -265,6 +265,7 @@ public enum TSStatusCode {
CREATE_PIPE_PLUGIN_ERROR(1600),
DROP_PIPE_PLUGIN_ERROR(1601),
PIPE_PLUGIN_LOAD_CLASS_ERROR(1602),
+ @Deprecated
PIPE_PLUGIN_DOWNLOAD_ERROR(1603),
CREATE_PIPE_PLUGIN_ON_DATANODE_ERROR(1604),
DROP_PIPE_PLUGIN_ON_DATANODE_ERROR(1605),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index e51d1a43299..f455edb26b8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1332,10 +1332,11 @@ public class ConfigManager implements IManager {
}
@Override
- public TPermissionInfoResp login(String username, String password) {
+ public TPermissionInfoResp login(
+ final String username, final String password, final boolean
useEncryptedPassword) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return permissionManager.login(username, password);
+ return permissionManager.login(username, password, useEncryptedPassword);
} else {
TPermissionInfoResp resp = AuthUtils.generateEmptyPermissionInfoResp();
resp.setStatus(status);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index fe83b8bd0c1..02c82164595 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -501,7 +501,8 @@ public interface IManager {
DataSet queryPermission(final AuthorPlan authorPlan);
/** login. */
- TPermissionInfoResp login(String username, String password);
+ TPermissionInfoResp login(
+ final String username, final String password, final boolean
useEncryptedPassword);
/** Check User Privileges. */
TPermissionInfoResp checkUserPrivileges(String username, PrivilegeUnion
union);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index 06be2818593..ee39bfc2b19 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -112,8 +112,9 @@ public class PermissionManager {
return configManager.getConsensusManager();
}
- public TPermissionInfoResp login(String username, String password) {
- return authorInfo.login(username, password);
+ public TPermissionInfoResp login(
+ final String username, final String password, final boolean
useEncryptedPassword) {
+ return authorInfo.login(username, password, useEncryptedPassword);
}
public String login4Pipe(final String userName, final String password) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index d9f6e07be2b..81e82b96ade 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -1225,7 +1225,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
@Override
protected TSStatus login() {
- return configManager.login(username, password).getStatus();
+ return configManager.login(username, password, false).getStatus();
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
index 60512887703..eca4943c46f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
@@ -55,8 +55,11 @@ import org.apache.iotdb.pipe.api.annotation.TreeModel;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException;
import org.apache.iotdb.rpc.TSStatusCode;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Collections;
@@ -108,6 +111,20 @@ public class IoTDBConfigRegionSource extends
IoTDBNonDataRegionSource {
PipeConfigNodeRemainingTimeMetrics.getInstance().register(this);
}
+ @Override
+ protected void login(final @Nonnull String password) {
+ if (ConfigNode.getInstance()
+ .getConfigManager()
+ .getPermissionManager()
+ .login(userName, password, true)
+ .getStatus()
+ .getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipePasswordCheckException(
+ String.format("Failed to check password for pipe %s.", pipeName));
+ }
+ }
+
@Override
protected AbstractPipeListeningQueue getListeningQueue() {
return PipeConfigNodeAgent.runtime().listener();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java
index 743e0a3f09a..d7404009825 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorInfo.java
@@ -124,8 +124,9 @@ public class AuthorInfo implements SnapshotProcessor {
this.authorPlanExecutor = authorPlanExecutor;
}
- public TPermissionInfoResp login(String username, String password) {
- return authorPlanExecutor.login(username, password);
+ public TPermissionInfoResp login(
+ final String username, final String password, final boolean
useEncryptedPassword) {
+ return authorPlanExecutor.login(username, password, useEncryptedPassword);
}
public String login4Pipe(final String username, final String password) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java
index 5e8843418c1..8f216df3250 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java
@@ -74,13 +74,14 @@ public class AuthorPlanExecutor implements
IAuthorPlanExecutor {
}
@Override
- public TPermissionInfoResp login(String username, String password) {
+ public TPermissionInfoResp login(
+ String username, final String password, final boolean
useEncryptedPassword) {
boolean status;
String loginMessage = null;
TSStatus tsStatus = new TSStatus();
TPermissionInfoResp result = new TPermissionInfoResp();
try {
- status = authorizer.login(username, password);
+ status = authorizer.login(username, password, useEncryptedPassword);
if (status) {
// Bring this user's permission information back to the datanode for
caching
if (authorizer instanceof OpenIdAuthorizer) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java
index 24f0d8ceb0a..724ebf43f73 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/IAuthorPlanExecutor.java
@@ -33,7 +33,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TAuthizedPatternTreeResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
public interface IAuthorPlanExecutor {
- TPermissionInfoResp login(String username, String password);
+ TPermissionInfoResp login(
+ final String username, final String password, final boolean
useEncryptedPassword);
String login4Pipe(final String username, final String password);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 53f908bf4fc..921661d13de 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -107,14 +107,18 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
- PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY);
+ PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY,
+ PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY);
final boolean checkSink =
new PipeParameters(alterPipeRequest.getConnectorAttributes())
.hasAnyAttributes(
PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY,
PipeSinkConstant.SINK_IOTDB_USER_KEY,
PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY,
- PipeSinkConstant.SINK_IOTDB_USERNAME_KEY);
+ PipeSinkConstant.SINK_IOTDB_USERNAME_KEY,
+ PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY,
+ PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY);
pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index f99293f0a02..f880bbdb85c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -174,7 +174,9 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
if
(sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY)
||
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USER_KEY)
||
sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY)
- ||
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)) {
+ ||
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)
+ ||
sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY)
+ ||
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)) {
final String hashedPassword =
env.getConfigManager()
.getPermissionManager()
@@ -216,7 +218,9 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
if (sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY)
|| sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_USER_KEY)
||
sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY)
- ||
sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY)) {
+ ||
sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY)
+ ||
sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY)
+ ||
sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY)) {
final String hashedPassword =
env.getConfigManager()
.getPermissionManager()
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 26fed0e1c4a..5d6aa8da9f5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -744,7 +744,10 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
@Override
public TPermissionInfoResp login(TLoginReq req) {
- return configManager.login(req.getUserrname(), req.getPassword());
+ return configManager.login(
+ req.getUserrname(),
+ req.getPassword(),
+ req.isSetUseEncryptedPassword() && req.isUseEncryptedPassword());
}
@Override
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
index 151307e280d..34a9a411133 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/AuthorInfoTest.java
@@ -664,7 +664,7 @@ public class AuthorInfoTest {
new ArrayList<>());
status = authorInfo.authorNonQuery(authorPlan);
assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- TPermissionInfoResp result = authorInfo.login("testuser",
"password123456");
+ TPermissionInfoResp result = authorInfo.login("testuser",
"password123456", false);
assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
result.getStatus().getCode());
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
index bd9a2fd73d3..ac89d99df35 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
@@ -126,27 +126,27 @@ public class PipeConsensusSyncLagManager {
}
private static class PipeConsensusSyncLagManagerHolder {
- private static Map<String, PipeConsensusSyncLagManager>
CONSENSU_GROUP_ID_2_INSTANCE_MAP;
+ private static Map<String, PipeConsensusSyncLagManager>
CONSENSUS_GROUP_ID_2_INSTANCE_MAP;
private PipeConsensusSyncLagManagerHolder() {
// empty constructor
}
private static void build() {
- if (CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) {
- CONSENSU_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
+ if (CONSENSUS_GROUP_ID_2_INSTANCE_MAP == null) {
+ CONSENSUS_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
}
}
}
public static PipeConsensusSyncLagManager getInstance(String groupId) {
- return
PipeConsensusSyncLagManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
+ return
PipeConsensusSyncLagManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
groupId, key -> new PipeConsensusSyncLagManager());
}
public static void release(String groupId) {
PipeConsensusSyncLagManager.getInstance(groupId).clear();
-
PipeConsensusSyncLagManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.remove(groupId);
+
PipeConsensusSyncLagManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.remove(groupId);
}
// Only when consensus protocol is PipeConsensus, this method will be called
once when construct
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index 81db578ace1..21c3bc787ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -142,8 +142,14 @@ public class AuthorityChecker {
return Optional.ofNullable(user == null ? null : user.getUserId());
}
- public static TSStatus checkUser(String userName, String password) {
- TSStatus status = authorityFetcher.get().checkUser(userName, password);
+ public static TSStatus checkUser(final String userName, final String
password) {
+ return checkUser(userName, password, false);
+ }
+
+ public static TSStatus checkUser(
+ final String userName, final String password, final boolean
useEncryptedPassword) {
+ final TSStatus status =
+ authorityFetcher.get().checkUser(userName, password,
useEncryptedPassword);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
index 2894ce8bb47..325476174c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
@@ -543,23 +543,31 @@ public class ClusterAuthorityFetcher implements
IAuthorityFetcher {
}
@Override
- public TSStatus checkUser(String username, String password) {
+ public TSStatus checkUser(
+ final String username, final String password, final boolean
useEncryptedPassword) {
checkCacheAvailable();
- User user = iAuthorCache.getUserCache(username);
+ final User user = iAuthorCache.getUserCache(username);
if (user != null) {
if (user.isOpenIdUser()) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } else if (password != null && AuthUtils.validatePassword(password,
user.getPassword())) {
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } else if (password != null
- && AuthUtils.validatePassword(
- password, user.getPassword(),
AsymmetricEncrypt.DigestAlgorithm.MD5)) {
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ } else if (password != null) {
+ if (useEncryptedPassword) {
+ return password.equals(user.getPassword())
+ ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+ : RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD,
"Authentication failed.");
+ } else {
+ return AuthUtils.validatePassword(password, user.getPassword())
+ || AuthUtils.validatePassword(
+ password, user.getPassword(),
AsymmetricEncrypt.DigestAlgorithm.MD5)
+ ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+ : RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD,
"Authentication failed.");
+ }
} else {
return RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD,
"Authentication failed.");
}
} else {
- TLoginReq req = new TLoginReq(username, password);
+ TLoginReq req =
+ new TLoginReq(username,
password).setUseEncryptedPassword(useEncryptedPassword);
TPermissionInfoResp status = null;
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java
index 3dc95fa41dc..30267987895 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java
@@ -36,7 +36,8 @@ import java.util.List;
public interface IAuthorityFetcher {
- TSStatus checkUser(String username, String password);
+ TSStatus checkUser(
+ final String username, final String password, final boolean
useEncryptedPassword);
boolean checkRole(String username, String roleName);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
index 11c1edafccb..d57956109d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
@@ -27,7 +27,9 @@ import
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableMa
import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
import
org.apache.iotdb.db.pipe.agent.plugin.dataregion.PipeDataRegionPluginAgent;
import
org.apache.iotdb.db.pipe.agent.plugin.schemaregion.PipeSchemaRegionPluginAgent;
+import
org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningFilter;
import org.apache.iotdb.pipe.api.PipePlugin;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
@@ -70,7 +72,7 @@ public class PipeDataNodePluginAgent {
/////////////////////////////// Pipe Plugin Management
///////////////////////////////
- public void register(PipePluginMeta pipePluginMeta, ByteBuffer jarFile)
+ public void register(final PipePluginMeta pipePluginMeta, final ByteBuffer
jarFile)
throws IOException, PipeException {
lock.lock();
try {
@@ -86,7 +88,7 @@ public class PipeDataNodePluginAgent {
}
}
- private void checkIfRegistered(PipePluginMeta pipePluginMeta) throws
PipeException {
+ private void checkIfRegistered(final PipePluginMeta pipePluginMeta) throws
PipeException {
final String pluginName = pipePluginMeta.getPluginName();
final PipePluginMeta information =
pipePluginMetaKeeper.getPipePluginMeta(pluginName);
if (information == null) {
@@ -121,7 +123,8 @@ public class PipeDataNodePluginAgent {
// we allow users to register the same pipe plugin multiple times without
any error
}
- private void saveJarFileIfNeeded(String pluginName, String jarName,
ByteBuffer byteBuffer)
+ private void saveJarFileIfNeeded(
+ final String pluginName, final String jarName, final ByteBuffer
byteBuffer)
throws IOException {
if (byteBuffer != null) {
PipePluginExecutableManager.getInstance()
@@ -136,7 +139,7 @@ public class PipeDataNodePluginAgent {
* @param pipePluginMeta the meta information of the PipePlugin
* @throws PipeException if the PipePlugin can not be loaded or its instance
can not be created
*/
- public void doRegister(PipePluginMeta pipePluginMeta) throws PipeException {
+ public void doRegister(final PipePluginMeta pipePluginMeta) throws
PipeException {
final String pluginName = pipePluginMeta.getPluginName();
final String className = pipePluginMeta.getClassName();
@@ -156,7 +159,7 @@ public class PipeDataNodePluginAgent {
pipePluginMetaKeeper.addPipePluginVisibility(
pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
classLoaderManager.addPluginAndClassLoader(pluginName,
pipePluginClassLoader);
- } catch (IOException
+ } catch (final IOException
| InstantiationException
| InvocationTargetException
| NoSuchMethodException
@@ -173,7 +176,8 @@ public class PipeDataNodePluginAgent {
}
}
- public void deregister(String pluginName, boolean needToDeleteJar) throws
PipeException {
+ public void deregister(final String pluginName, final boolean
needToDeleteJar)
+ throws PipeException {
lock.lock();
try {
final PipePluginMeta information =
pipePluginMetaKeeper.getPipePluginMeta(pluginName);
@@ -197,7 +201,7 @@ public class PipeDataNodePluginAgent {
PipePluginExecutableManager.getInstance()
.removeFileUnderTemporaryRoot(pluginName.toUpperCase() + ".txt");
}
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new PipeException(e.getMessage(), e);
} finally {
lock.unlock();
@@ -206,20 +210,22 @@ public class PipeDataNodePluginAgent {
// TODO: validate pipe plugin attributes for config node
public void validate(
- String pipeName,
- Map<String, String> extractorAttributes,
- Map<String, String> processorAttributes,
- Map<String, String> connectorAttributes)
+ final String pipeName,
+ final Map<String, String> sourceAttributes,
+ final Map<String, String> processorAttributes,
+ final Map<String, String> sinkAttributes)
throws Exception {
- dataRegionAgent.validate(
- pipeName, extractorAttributes, processorAttributes,
connectorAttributes);
- schemaRegionAgent.validate(
- pipeName, extractorAttributes, processorAttributes,
connectorAttributes);
+ dataRegionAgent.validate(pipeName, sourceAttributes, processorAttributes,
sinkAttributes);
+
+ if (SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
+ new PipeParameters(sinkAttributes))) {
+ schemaRegionAgent.validate(pipeName, sourceAttributes,
processorAttributes, sinkAttributes);
+ }
}
public boolean checkIfPluginSameType(final String oldPluginName, final
String newPluginName) {
- PipePluginMeta oldPipePluginMeta =
pipePluginMetaKeeper.getPipePluginMeta(oldPluginName);
- PipePluginMeta newPipePluginMeta =
pipePluginMetaKeeper.getPipePluginMeta(newPluginName);
+ final PipePluginMeta oldPipePluginMeta =
pipePluginMetaKeeper.getPipePluginMeta(oldPluginName);
+ final PipePluginMeta newPipePluginMeta =
pipePluginMetaKeeper.getPipePluginMeta(newPluginName);
if (oldPipePluginMeta == null) {
throw new PipeException(String.format("plugin %s is not registered.",
oldPluginName));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
index a8d002fb27c..67a7d6549d9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java
@@ -62,24 +62,24 @@ public class PipeDataRegionPluginAgent extends
PipePluginAgent {
@Override
public void validate(
- String pipeName,
- Map<String, String> sourceAttributes,
- Map<String, String> processorAttributes,
- Map<String, String> sinkAttributes)
+ final String pipeName,
+ final Map<String, String> sourceAttributes,
+ final Map<String, String> processorAttributes,
+ final Map<String, String> sinkAttributes)
throws Exception {
- PipeExtractor temporaryExtractor = validateSource(sourceAttributes);
- PipeProcessor temporaryProcessor = validateProcessor(processorAttributes);
- PipeConnector temporaryConnector = validateSink(pipeName, sinkAttributes);
+ final PipeExtractor temporaryExtractor = validateSource(pipeName,
sourceAttributes);
+ final PipeProcessor temporaryProcessor =
validateProcessor(processorAttributes);
+ final PipeConnector temporaryConnector = validateSink(pipeName,
sinkAttributes);
// validate visibility
// TODO: validate visibility for schema region and config region
- Visibility pipeVisibility =
+ final Visibility pipeVisibility =
VisibilityUtils.calculateFromExtractorParameters(new
PipeParameters(sourceAttributes));
- Visibility extractorVisibility =
+ final Visibility extractorVisibility =
VisibilityUtils.calculateFromPluginClass(temporaryExtractor.getClass());
- Visibility processorVisibility =
+ final Visibility processorVisibility =
VisibilityUtils.calculateFromPluginClass(temporaryProcessor.getClass());
- Visibility connectorVisibility =
+ final Visibility connectorVisibility =
VisibilityUtils.calculateFromPluginClass(temporaryConnector.getClass());
if (!VisibilityUtils.isCompatible(
pipeVisibility, extractorVisibility, processorVisibility,
connectorVisibility)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 8c1ef90f97f..6535d371a91 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -195,29 +195,29 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
//////////////////////////// register & deregister (pipe integration)
////////////////////////////
- public void register(final IoTDBDataRegionSource extractor) {
+ public void register(final IoTDBDataRegionSource source) {
// The metric is global thus the regionId is omitted
- final String pipeID = extractor.getPipeName() + "_" +
extractor.getCreationTime();
+ final String pipeID = source.getPipeName() + "_" +
source.getCreationTime();
remainingEventAndTimeOperatorMap.computeIfAbsent(
pipeID,
k ->
new PipeDataNodeRemainingEventAndTimeOperator(
- extractor.getPipeName(), extractor.getCreationTime()));
+ source.getPipeName(), source.getCreationTime()));
if (Objects.nonNull(metricService)) {
createMetrics(pipeID);
}
}
- public void register(final IoTDBSchemaRegionSource extractor) {
+ public void register(final IoTDBSchemaRegionSource source) {
// The metric is global thus the regionId is omitted
- final String pipeID = extractor.getPipeName() + "_" +
extractor.getCreationTime();
+ final String pipeID = source.getPipeName() + "_" +
source.getCreationTime();
remainingEventAndTimeOperatorMap
.computeIfAbsent(
pipeID,
k ->
new PipeDataNodeRemainingEventAndTimeOperator(
- extractor.getPipeName(), extractor.getCreationTime()))
- .register(extractor);
+ source.getPipeName(), source.getCreationTime()))
+ .register(source);
if (Objects.nonNull(metricService)) {
createMetrics(pipeID);
}
@@ -233,7 +233,7 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
public void decreaseInsertNodeEventCount(
final String pipeName, final long creationTime, final long transferTime)
{
- PipeDataNodeRemainingEventAndTimeOperator operator =
+ final PipeDataNodeRemainingEventAndTimeOperator operator =
remainingEventAndTimeOperatorMap.computeIfAbsent(
pipeName + "_" + creationTime,
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSourceMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSourceMetrics.java
index 9c752d9753f..ce5d60449fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSourceMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSourceMetrics.java
@@ -42,14 +42,14 @@ public class PipeSchemaRegionSourceMetrics implements
IMetricSet {
@SuppressWarnings("java:S3077")
private volatile AbstractMetricService metricService;
- private final Map<String, IoTDBSchemaRegionSource> extractorMap = new
ConcurrentHashMap<>();
+ private final Map<String, IoTDBSchemaRegionSource> sourceMap = new
ConcurrentHashMap<>();
//////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
@Override
public void bindTo(final AbstractMetricService metricService) {
this.metricService = metricService;
- ImmutableSet.copyOf(extractorMap.keySet()).forEach(this::createMetrics);
+ ImmutableSet.copyOf(sourceMap.keySet()).forEach(this::createMetrics);
}
private void createMetrics(final String taskID) {
@@ -57,24 +57,24 @@ public class PipeSchemaRegionSourceMetrics implements
IMetricSet {
}
private void createAutoGauge(final String taskID) {
- final IoTDBSchemaRegionSource extractor = extractorMap.get(taskID);
+ final IoTDBSchemaRegionSource source = sourceMap.get(taskID);
metricService.createAutoGauge(
Metric.UNTRANSFERRED_SCHEMA_COUNT.toString(),
MetricLevel.IMPORTANT,
- extractorMap.get(taskID),
+ sourceMap.get(taskID),
IoTDBSchemaRegionSource::getUnTransferredEventCount,
Tag.NAME.toString(),
- extractor.getPipeName(),
+ source.getPipeName(),
Tag.REGION.toString(),
- String.valueOf(extractor.getRegionId()),
+ String.valueOf(source.getRegionId()),
Tag.CREATION_TIME.toString(),
- String.valueOf(extractor.getCreationTime()));
+ String.valueOf(source.getCreationTime()));
}
@Override
public void unbindFrom(final AbstractMetricService metricService) {
- ImmutableSet.copyOf(extractorMap.keySet()).forEach(this::deregister);
- if (!extractorMap.isEmpty()) {
+ ImmutableSet.copyOf(sourceMap.keySet()).forEach(this::deregister);
+ if (!sourceMap.isEmpty()) {
LOGGER.warn(
"Failed to unbind from pipe schema region extractor metrics,
extractor map not empty");
}
@@ -85,7 +85,7 @@ public class PipeSchemaRegionSourceMetrics implements
IMetricSet {
}
private void removeAutoGauge(final String taskID) {
- final IoTDBSchemaRegionSource extractor = extractorMap.get(taskID);
+ final IoTDBSchemaRegionSource extractor = sourceMap.get(taskID);
// pending event count
metricService.remove(
MetricType.AUTO_GAUGE,
@@ -100,41 +100,41 @@ public class PipeSchemaRegionSourceMetrics implements
IMetricSet {
//////////////////////////// register & deregister (pipe integration)
////////////////////////////
- public void register(final IoTDBSchemaRegionSource extractor) {
- final String taskID = extractor.getTaskID();
- extractorMap.putIfAbsent(taskID, extractor);
+ public void register(final IoTDBSchemaRegionSource source) {
+ final String taskID = source.getTaskID();
+ sourceMap.putIfAbsent(taskID, source);
if (Objects.nonNull(metricService)) {
createMetrics(taskID);
}
}
public void deregister(final String taskID) {
- if (!extractorMap.containsKey(taskID)) {
+ if (!sourceMap.containsKey(taskID)) {
LOGGER.warn(
- "Failed to deregister pipe schema region extractor metrics,
IoTDBSchemaRegionExtractor({}) does not exist",
+ "Failed to deregister pipe schema region source metrics,
IoTDBSchemaRegionSource({}) does not exist",
taskID);
return;
}
if (Objects.nonNull(metricService)) {
removeMetrics(taskID);
}
- extractorMap.remove(taskID);
+ sourceMap.remove(taskID);
}
//////////////////////////// singleton ////////////////////////////
- private static class PipeSchemaRegionExtractorMetricsHolder {
+ private static class PipeSchemaRegionSourceMetricsHolder {
private static final PipeSchemaRegionSourceMetrics INSTANCE =
new PipeSchemaRegionSourceMetrics();
- private PipeSchemaRegionExtractorMetricsHolder() {
+ private PipeSchemaRegionSourceMetricsHolder() {
// Empty constructor
}
}
public static PipeSchemaRegionSourceMetrics getInstance() {
- return PipeSchemaRegionExtractorMetricsHolder.INSTANCE;
+ return PipeSchemaRegionSourceMetricsHolder.INSTANCE;
}
private PipeSchemaRegionSourceMetrics() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 904b335ec58..867e2487a50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -953,7 +953,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
long userId = AuthorityChecker.getUserId(username).orElse(-1L);
- Long timeToExpire = DataNodeAuthUtils.checkPasswordExpiration(userId,
password);
+ Long timeToExpire = DataNodeAuthUtils.checkPasswordExpiration(userId,
password, false);
if (timeToExpire != null && timeToExpire <= System.currentTimeMillis()) {
return RpcUtils.getStatus(
TSStatusCode.ILLEGAL_PASSWORD.getStatusCode(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 9d476c54294..07b88a98053 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -83,6 +84,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_CLI_HOSTNAME;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_ID;
@@ -91,6 +93,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_USE_EVENT_USER_NAME_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_USE_EVENT_USER_NAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_CLI_HOSTNAME;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_ID;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
@@ -156,6 +159,8 @@ public class WriteBackSink implements PipeConnector {
SINK_IOTDB_USER_KEY,
CONNECTOR_IOTDB_USERNAME_KEY,
SINK_IOTDB_USERNAME_KEY);
+ String passwordString =
+ parameters.getStringByKeys(CONNECTOR_IOTDB_PASSWORD_KEY,
SINK_IOTDB_PASSWORD_KEY);
String cliHostnameString =
parameters.getStringByKeys(CONNECTOR_IOTDB_CLI_HOSTNAME,
SINK_IOTDB_CLI_HOSTNAME);
userEntity = new UserEntity(Long.parseLong(userIdString), usernameString,
cliHostnameString);
@@ -191,6 +196,24 @@ public class WriteBackSink implements PipeConnector {
if (SESSION_MANAGER.getCurrSession() == null) {
SESSION_MANAGER.registerSession(session);
}
+
+ // Check the password and its expiration
+ if (Objects.nonNull(passwordString)
+ && SESSION_MANAGER
+ .login(
+ session,
+ usernameString,
+ passwordString,
+ ZoneId.systemDefault().toString(),
+ SessionManager.CURRENT_RPC_VERSION,
+ IoTDBConstant.ClientVersion.V_1_0,
+ IClientSession.SqlDialect.TREE,
+ environment.getRegionId() >= 0)
+ .getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipePasswordCheckException(
+ String.format("Failed to check password for pipe %s.",
environment.getPipeName()));
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index d1ceb48e8aa..1c510be959b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.source.dataregion;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
@@ -39,6 +40,9 @@ import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegio
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionTsFileSource;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
@@ -50,11 +54,16 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
@@ -128,8 +137,8 @@ public class IoTDBDataRegionSource extends IoTDBSource {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataRegionSource.class);
- private PipeHistoricalDataRegionSource historicalExtractor;
- private PipeRealtimeDataRegionSource realtimeExtractor;
+ private PipeHistoricalDataRegionSource historicalSource;
+ private PipeRealtimeDataRegionSource realtimeSource;
private DataRegionWatermarkInjector watermarkInjector;
@@ -294,11 +303,11 @@ public class IoTDBDataRegionSource extends IoTDBSource {
checkInvalidParameters(validator);
- constructHistoricalExtractor();
- constructRealtimeExtractor(validator.getParameters());
+ constructHistoricalSource();
+ constructRealtimeSource(validator.getParameters());
- historicalExtractor.validate(validator);
- realtimeExtractor.validate(validator);
+ historicalSource.validate(validator);
+ realtimeSource.validate(validator);
}
private void validatePattern(final TreePattern treePattern) {
@@ -430,16 +439,16 @@ public class IoTDBDataRegionSource extends IoTDBSource {
}
}
- private void constructHistoricalExtractor() {
- historicalExtractor = new
PipeHistoricalDataRegionTsFileAndDeletionSource();
+ private void constructHistoricalSource() {
+ historicalSource = new PipeHistoricalDataRegionTsFileAndDeletionSource();
}
- private void constructRealtimeExtractor(final PipeParameters parameters) {
+ private void constructRealtimeSource(final PipeParameters parameters) {
// Use heartbeat only source if disable realtime source
if (!parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
- realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource();
+ realtimeSource = new PipeRealtimeDataRegionHeartbeatSource();
LOGGER.info(
"Pipe: '{}' ('{}') is set to false, use heartbeat realtime source.",
EXTRACTOR_REALTIME_ENABLE_KEY,
@@ -449,7 +458,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
// Use heartbeat only source if enable snapshot mode
if (PipeTaskAgent.isSnapshotMode(parameters)) {
- realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource();
+ realtimeSource = new PipeRealtimeDataRegionHeartbeatSource();
LOGGER.info("Pipe: snapshot mode is enabled, use heartbeat realtime
source.");
return;
}
@@ -457,7 +466,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
// Use hybrid mode by default
if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY,
SOURCE_MODE_STREAMING_KEY)
&& !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
- realtimeExtractor = new PipeRealtimeDataRegionHybridSource();
+ realtimeSource = new PipeRealtimeDataRegionHybridSource();
LOGGER.info(
"Pipe: '{}' ('{}') and '{}' ('{}') is not set, use hybrid mode by
default.",
EXTRACTOR_MODE_STREAMING_KEY,
@@ -473,9 +482,9 @@ public class IoTDBDataRegionSource extends IoTDBSource {
Arrays.asList(EXTRACTOR_MODE_STREAMING_KEY,
SOURCE_MODE_STREAMING_KEY),
EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE);
if (isStreamingMode) {
- realtimeExtractor = new PipeRealtimeDataRegionHybridSource();
+ realtimeSource = new PipeRealtimeDataRegionHybridSource();
} else {
- realtimeExtractor = new PipeRealtimeDataRegionTsFileSource();
+ realtimeSource = new PipeRealtimeDataRegionTsFileSource();
}
return;
}
@@ -483,18 +492,18 @@ public class IoTDBDataRegionSource extends IoTDBSource {
switch (parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
case EXTRACTOR_REALTIME_MODE_FILE_VALUE:
case EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE:
- realtimeExtractor = new PipeRealtimeDataRegionTsFileSource();
+ realtimeSource = new PipeRealtimeDataRegionTsFileSource();
break;
case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE:
- realtimeExtractor = new PipeRealtimeDataRegionHybridSource();
+ realtimeSource = new PipeRealtimeDataRegionHybridSource();
break;
case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
- realtimeExtractor = new PipeRealtimeDataRegionLogSource();
+ realtimeSource = new PipeRealtimeDataRegionLogSource();
break;
default:
- realtimeExtractor = new PipeRealtimeDataRegionHybridSource();
+ realtimeSource = new PipeRealtimeDataRegionHybridSource();
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(
"Pipe: Unsupported source realtime mode: {}, create a hybrid
source.",
@@ -513,8 +522,8 @@ public class IoTDBDataRegionSource extends IoTDBSource {
super.customize(parameters, configuration);
- historicalExtractor.customize(parameters, configuration);
- realtimeExtractor.customize(parameters, configuration);
+ historicalSource.customize(parameters, configuration);
+ realtimeSource.customize(parameters, configuration);
// Set watermark injector
long watermarkIntervalInMs = EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE;
@@ -542,8 +551,29 @@ public class IoTDBDataRegionSource extends IoTDBSource {
// register metric after generating taskID
PipeDataRegionSourceMetrics.getInstance().register(this);
- PipeTsFileToTabletsMetrics.getInstance().register(this);
- PipeDataNodeSinglePipeMetrics.getInstance().register(this);
+ if (regionId >= 0) {
+ PipeTsFileToTabletsMetrics.getInstance().register(this);
+ PipeDataNodeSinglePipeMetrics.getInstance().register(this);
+ }
+ }
+
+ @Override
+ protected void login(final @Nonnull String password) {
+ if (SessionManager.getInstance()
+ .login(
+ new InternalClientSession("Source_login_session_" + regionId),
+ userName,
+ password,
+ ZoneId.systemDefault().toString(),
+ SessionManager.CURRENT_RPC_VERSION,
+ IoTDBConstant.ClientVersion.V_1_0,
+ IClientSession.SqlDialect.TREE,
+ regionId >= 0)
+ .getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipePasswordCheckException(
+ String.format("Failed to check password for pipe %s.", pipeName));
+ }
}
@Override
@@ -557,8 +587,8 @@ public class IoTDBDataRegionSource extends IoTDBSource {
"Pipe {}@{}: Starting historical source {} and realtime source {}.",
pipeName,
regionId,
- historicalExtractor.getClass().getSimpleName(),
- realtimeExtractor.getClass().getSimpleName());
+ historicalSource.getClass().getSimpleName(),
+ realtimeSource.getClass().getSimpleName());
super.start();
@@ -591,8 +621,8 @@ public class IoTDBDataRegionSource extends IoTDBSource {
"Pipe {}@{}: Started historical source {} and realtime source {}
successfully within {} ms.",
pipeName,
regionId,
- historicalExtractor.getClass().getSimpleName(),
- realtimeExtractor.getClass().getSimpleName(),
+ historicalSource.getClass().getSimpleName(),
+ realtimeSource.getClass().getSimpleName(),
System.currentTimeMillis() - startTime);
return;
}
@@ -603,21 +633,21 @@ public class IoTDBDataRegionSource extends IoTDBSource {
private void startHistoricalExtractorAndRealtimeExtractor(
final AtomicReference<Exception> exceptionHolder) {
try {
- // Start realtimeExtractor first to avoid losing data. This may cause
some
+ // Start realtimeSource first to avoid losing data. This may cause some
// retransmission, yet it is OK according to the idempotency of IoTDB.
// Note: The order of historical collection is flushing data -> adding
all tsFile events.
// There can still be writing when tsFile events are added. If we start
- // realtimeExtractor after the process, then this part of data will be
lost.
- realtimeExtractor.start();
- historicalExtractor.start();
+ // realtimeSource after the process, then this part of data will be lost.
+ realtimeSource.start();
+ historicalSource.start();
} catch (final Exception e) {
exceptionHolder.set(e);
LOGGER.warn(
"Pipe {}@{}: Start historical source {} and realtime source {}
error.",
pipeName,
regionId,
- historicalExtractor.getClass().getSimpleName(),
- realtimeExtractor.getClass().getSimpleName(),
+ historicalSource.getClass().getSimpleName(),
+ realtimeSource.getClass().getSimpleName(),
e);
}
}
@@ -635,14 +665,14 @@ public class IoTDBDataRegionSource extends IoTDBSource {
}
Event event = null;
- if (!historicalExtractor.hasConsumedAll()) {
- event = historicalExtractor.supply();
+ if (!historicalSource.hasConsumedAll()) {
+ event = historicalSource.supply();
} else {
if (Objects.nonNull(watermarkInjector)) {
event = watermarkInjector.inject();
}
if (Objects.isNull(event)) {
- event = realtimeExtractor.supply();
+ event = realtimeSource.supply();
}
}
@@ -665,8 +695,8 @@ public class IoTDBDataRegionSource extends IoTDBSource {
return;
}
- historicalExtractor.close();
- realtimeExtractor.close();
+ historicalSource.close();
+ realtimeSource.close();
if (Objects.nonNull(taskID)) {
PipeDataRegionSourceMetrics.getInstance().deregister(taskID);
}
@@ -675,20 +705,20 @@ public class IoTDBDataRegionSource extends IoTDBSource {
//////////////////////////// APIs provided for metric framework
////////////////////////////
public int getHistoricalTsFileInsertionEventCount() {
- return hasBeenStarted.get() && Objects.nonNull(historicalExtractor)
- ? historicalExtractor.getPendingQueueSize()
+ return hasBeenStarted.get() && Objects.nonNull(historicalSource)
+ ? historicalSource.getPendingQueueSize()
: 0;
}
public int getTabletInsertionEventCount() {
- return hasBeenStarted.get() ?
realtimeExtractor.getTabletInsertionEventCount() : 0;
+ return hasBeenStarted.get() ?
realtimeSource.getTabletInsertionEventCount() : 0;
}
public int getRealtimeTsFileInsertionEventCount() {
- return hasBeenStarted.get() ?
realtimeExtractor.getTsFileInsertionEventCount() : 0;
+ return hasBeenStarted.get() ?
realtimeSource.getTsFileInsertionEventCount() : 0;
}
public int getPipeHeartbeatEventCount() {
- return hasBeenStarted.get() ?
realtimeExtractor.getPipeHeartbeatEventCount() : 0;
+ return hasBeenStarted.get() ? realtimeSource.getPipeHeartbeatEventCount()
: 0;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 8aac47b01c7..321458e156b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -57,6 +57,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.DateTimeUtils;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
@@ -305,17 +306,17 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
return;
}
- final PipeTaskSourceRuntimeEnvironment environment =
- (PipeTaskSourceRuntimeEnvironment)
configuration.getRuntimeEnvironment();
+ final PipeRuntimeEnvironment environment =
configuration.getRuntimeEnvironment();
pipeName = environment.getPipeName();
creationTime = environment.getCreationTime();
- pipeTaskMeta = environment.getPipeTaskMeta();
- if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
- startIndex =
-
tryToExtractLocalProgressIndexForIoTV2(environment.getPipeTaskMeta().getProgressIndex());
- } else {
- startIndex = environment.getPipeTaskMeta().getProgressIndex();
+ if (environment instanceof PipeTaskSourceRuntimeEnvironment) {
+ pipeTaskMeta = ((PipeTaskSourceRuntimeEnvironment)
environment).getPipeTaskMeta();
+ if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
+ startIndex =
tryToExtractLocalProgressIndexForIoTV2(pipeTaskMeta.getProgressIndex());
+ } else {
+ startIndex = pipeTaskMeta.getProgressIndex();
+ }
}
dataRegionId = environment.getRegionId();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index de0ee264473..204c9c87d14 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -47,6 +47,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.PipeExtractor;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
@@ -205,8 +206,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
public void customize(
final PipeParameters parameters, final PipeExtractorRuntimeConfiguration
configuration)
throws Exception {
- final PipeTaskSourceRuntimeEnvironment environment =
- (PipeTaskSourceRuntimeEnvironment)
configuration.getRuntimeEnvironment();
+ final PipeRuntimeEnvironment environment =
configuration.getRuntimeEnvironment();
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters);
@@ -215,7 +215,9 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
pipeName = environment.getPipeName();
dataRegionId = environment.getRegionId();
- pipeTaskMeta = environment.getPipeTaskMeta();
+ if (environment instanceof PipeTaskSourceRuntimeEnvironment) {
+ pipeTaskMeta = ((PipeTaskSourceRuntimeEnvironment)
environment).getPipeTaskMeta();
+ }
// Metrics related to TsFileEpoch are managed in PipeExtractorMetrics.
These metrics are
// indexed by the taskID of IoTDBDataRegionExtractor. To avoid
PipeRealtimeDataRegionExtractor
@@ -302,7 +304,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
- "Pipe {}@{}: realtime data region extractor is initialized with
parameters: {}.",
+ "Pipe {}@{}: realtime data region source is initialized with
parameters: {}.",
pipeName,
dataRegionId,
parameters);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
index b70ff4b7d8d..f743e1dc7cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
@@ -42,6 +42,9 @@ import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEve
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionSourceMetrics;
import
org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementToBatchVisitor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -58,12 +61,16 @@ import org.apache.iotdb.pipe.api.annotation.TreeModel;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.common.constant.TsFileConstant;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.nio.file.Paths;
+import java.time.ZoneId;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -107,7 +114,7 @@ public class IoTDBSchemaRegionSource extends
IoTDBNonDataRegionSource {
.getSchemaRegionConsensusProtocolClass()
.equals(ConsensusFactory.SIMPLE_CONSENSUS)) {
throw new PipeException(
- "IoTDBSchemaRegionExtractor does not transferring events under
simple consensus");
+ "IoTDBSchemaRegionSource does not support transferring events under
simple consensus");
}
super.customize(parameters, configuration);
@@ -117,7 +124,9 @@ public class IoTDBSchemaRegionSource extends
IoTDBNonDataRegionSource {
treePrivilegeParseVisitor = new
PipePlanTreePrivilegeParseVisitor(skipIfNoPrivileges);
PipeSchemaRegionSourceMetrics.getInstance().register(this);
- PipeDataNodeSinglePipeMetrics.getInstance().register(this);
+ if (regionId >= 0) {
+ PipeDataNodeSinglePipeMetrics.getInstance().register(this);
+ }
}
@Override
@@ -146,6 +155,25 @@ public class IoTDBSchemaRegionSource extends
IoTDBNonDataRegionSource {
super.start();
}
+ @Override
+ protected void login(final @Nonnull String password) {
+ if (SessionManager.getInstance()
+ .login(
+ new InternalClientSession("Source_login_session_" + regionId),
+ userName,
+ password,
+ ZoneId.systemDefault().toString(),
+ SessionManager.CURRENT_RPC_VERSION,
+ IoTDBConstant.ClientVersion.V_1_0,
+ IClientSession.SqlDialect.TREE,
+ regionId >= 0)
+ .getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipePasswordCheckException(
+ String.format("Failed to check password for pipe %s.", pipeName));
+ }
+ }
+
@Override
protected boolean needTransferSnapshot() {
// Note: the schema region will transfer snapshot if there are table or
tree planNode captured.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/SchemaRegionListeningFilter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/SchemaRegionListeningFilter.java
index ad37f27ef95..b0501d00ae6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/SchemaRegionListeningFilter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/SchemaRegionListeningFilter.java
@@ -121,7 +121,12 @@ public class SchemaRegionListeningFilter {
.getDatabaseFullPath());
return (TreePattern.isTreeModelDataAllowToBeCaptured(parameters) &&
!isTableModel
|| TablePattern.isTableModelDataAllowToBeCaptured(parameters) &&
isTableModel)
- && !parseListeningPlanTypeSet(parameters).isEmpty();
+ && shouldSchemaRegionBeListened(parameters);
+ }
+
+ public static boolean shouldSchemaRegionBeListened(final PipeParameters
parameters)
+ throws IllegalPathException {
+ return !parseListeningPlanTypeSet(parameters).isEmpty();
}
public static Set<PlanNodeType> parseListeningPlanTypeSet(final
PipeParameters parameters)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java
index 975cbb09437..d6cdfa63a32 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java
@@ -42,7 +42,7 @@ public class InternalClientSession extends IClientSession {
@Override
public String getClientAddress() {
- return clientID;
+ return "";
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index ee00655211f..ac7cc236d9f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -130,11 +130,26 @@ public class SessionManager implements
SessionManagerMBean {
TSProtocolVersion tsProtocolVersion,
IoTDBConstant.ClientVersion clientVersion,
IClientSession.SqlDialect sqlDialect) {
- BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp();
-
- long userId = AuthorityChecker.getUserId(username).orElse(-1L);
+ return login(
+ session, username, password, zoneId, tsProtocolVersion, clientVersion,
sqlDialect, false);
+ }
- Long timeToExpire = DataNodeAuthUtils.checkPasswordExpiration(userId,
password);
+ // Only pipe can set useEncryptedPassword to true
+ public BasicOpenSessionResp login(
+ final IClientSession session,
+ final String username,
+ final String password,
+ final String zoneId,
+ final TSProtocolVersion tsProtocolVersion,
+ final IoTDBConstant.ClientVersion clientVersion,
+ final IClientSession.SqlDialect sqlDialect,
+ final boolean useEncryptedPassword) {
+ final BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp();
+
+ final long userId = AuthorityChecker.getUserId(username).orElse(-1L);
+
+ Long timeToExpire =
+ DataNodeAuthUtils.checkPasswordExpiration(userId, password,
useEncryptedPassword);
if (timeToExpire != null && timeToExpire <= System.currentTimeMillis()) {
openSessionResp
.sessionId(-1)
@@ -154,7 +169,8 @@ public class SessionManager implements SessionManagerMBean {
return openSessionResp;
}
- TSStatus loginStatus = AuthorityChecker.checkUser(username, password);
+ final TSStatus loginStatus =
+ AuthorityChecker.checkUser(username, password, useEncryptedPassword);
if (loginStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// check the version compatibility
if (!tsProtocolVersion.equals(CURRENT_RPC_VERSION)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
index 60deee79509..1ea10ef37e8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
@@ -683,16 +683,15 @@ public class TreeConfigTaskVisitor extends
StatementVisitor<IConfigTask, MPPQuer
alterPipeStatement.setUserName(userName);
final String pipeName = alterPipeStatement.getPipeName();
- final Map<String, String> extractorAttributes =
alterPipeStatement.getSourceAttributes();
+ final Map<String, String> sourceAttributes =
alterPipeStatement.getSourceAttributes();
// If the source is replaced, sql-dialect uses the current Alter Pipe
sql-dialect. If it is
// modified, the original sql-dialect is used.
if (alterPipeStatement.isReplaceAllSourceAttributes()) {
- extractorAttributes.put(
- SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE);
+ sourceAttributes.put(SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE);
checkAndEnrichSourceUser(
pipeName,
- extractorAttributes,
+ sourceAttributes,
new UserEntity(context.getUserId(), context.getUsername(),
context.getCliHostname()),
true);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index cb45757d3be..404990e124d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -328,6 +328,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.Compacti
import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -372,6 +373,7 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -994,7 +996,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
future.setException(
new IoTDBException(
String.format("Failed to create pipe plugin %s. " + pathError,
pluginName),
- TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode()));
+ TSStatusCode.SEMANTIC_ERROR.getStatusCode()));
return future;
}
@@ -1002,7 +1004,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
future.setException(
new IoTDBException(
"Failed to create pipe plugin, because the URI is empty.",
- TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+ TSStatusCode.SEMANTIC_ERROR.getStatusCode()));
return future;
}
@@ -1019,7 +1021,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
future.setException(
new IoTDBException(
"The scheme of URI is not set, please specify the scheme of
URI.",
- TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+ TSStatusCode.SEMANTIC_ERROR.getStatusCode()));
return future;
}
if (!uri.getScheme().equals("file")) {
@@ -1059,7 +1061,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
"Failed to get executable for PipePlugin "
+ createPipePluginStatement.getPluginName()
+ ", please check the URI.",
- TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+ TSStatusCode.SEMANTIC_ERROR.getStatusCode()));
return future;
}
@@ -2175,7 +2177,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
future.setException(
new IoTDBException(
String.format("Failed to create pipe %s, " + pathError,
pipeName),
- TSStatusCode.PIPE_ERROR.getStatusCode()));
+ TSStatusCode.SEMANTIC_ERROR.getStatusCode()));
return future;
}
@@ -2189,7 +2191,11 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
createPipeStatement.getSinkAttributes());
} catch (final Exception e) {
future.setException(
- new IoTDBException(e.getMessage(),
TSStatusCode.PIPE_ERROR.getStatusCode()));
+ new IoTDBException(
+ e.getMessage(),
+ e instanceof PipePasswordCheckException
+ ? TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode()
+ : TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
}
@@ -2353,6 +2359,8 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
+ boolean hasSourcePassword = false;
+ boolean hasSinkPassword = false;
// Construct temporary pipe static meta for validation
final String pipeName = alterPipeStatement.getPipeName();
final Map<String, String> sourceAttributes;
@@ -2381,9 +2389,12 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
sourceAttributes =
pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute();
if (onlyContainsUser) {
- checkSourceType(alterPipeStatement.getPipeName(),
sourceAttributes);
+ checkSourceTypeWithException(alterPipeStatement.getPipeName(),
sourceAttributes);
}
}
+ hasSourcePassword =
+ !checkSourceType(sourceAttributes)
+ || containsPassword(alterPipeStatement.getSourceAttributes());
} else {
sourceAttributes =
pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute();
@@ -2407,6 +2418,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
if (!alterPipeStatement.getSinkAttributes().isEmpty()) {
+ // Do not remove for handshake
if (alterPipeStatement.isReplaceAllSinkAttributes()) {
sinkAttributes = alterPipeStatement.getSinkAttributes();
} else {
@@ -2419,18 +2431,39 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
sinkAttributes =
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
if (onlyContainsUser) {
- checkSinkType(alterPipeStatement.getPipeName(), sinkAttributes);
+ checkSinkTypeWithException(alterPipeStatement.getPipeName(),
sinkAttributes);
}
}
+ hasSinkPassword =
+ !checkSinkType(sinkAttributes)
+ || containsPassword(alterPipeStatement.getSinkAttributes());
} else {
sinkAttributes =
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
}
+ final Map<String, String> checkedSource = new
HashMap<>(sourceAttributes);
+ if (!hasSourcePassword) {
+ checkedSource.remove(PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY);
+ checkedSource.remove(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY);
+ checkedSource.remove(
+
PipeParameters.KeyReducer.reduce(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+ }
+ final Map<String, String> checkedSink = new HashMap<>(sinkAttributes);
+ if (!hasSinkPassword) {
+ checkedSink.remove(PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY);
+ checkedSink.remove(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY);
+ checkedSink.remove(
+
PipeParameters.KeyReducer.reduce(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY));
+ }
PipeDataNodeAgent.plugin()
- .validate(pipeName, sourceAttributes, processorAttributes,
sinkAttributes);
+ .validate(pipeName, checkedSource, processorAttributes, checkedSink);
} catch (final Exception e) {
future.setException(
- new IoTDBException(e.getMessage(),
TSStatusCode.PIPE_ERROR.getStatusCode()));
+ new IoTDBException(
+ e.getMessage(),
+ e instanceof PipePasswordCheckException
+ ? TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode()
+ : TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
}
@@ -2481,56 +2514,68 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
}
- private static void checkSourceType(
- final String pipeName, final Map<String, String>
replacedExtractorAttributes) {
- final PipeParameters extractorParameters = new
PipeParameters(replacedExtractorAttributes);
+ private static void checkSourceTypeWithException(
+ final String pipeName, final Map<String, String>
replacedSourceAttributes) {
+ if (checkSourceType(replacedSourceAttributes)) {
+ throw new SemanticException(
+ String.format(
+ "Failed to alter pipe %s, in iotdb-source, password must be set
when the username is specified.",
+ pipeName));
+ }
+ }
+
+ private static boolean checkSourceType(final Map<String, String>
replacedSourceAttributes) {
+ final PipeParameters sourceParameters = new
PipeParameters(replacedSourceAttributes);
final String pluginName =
- extractorParameters
+ sourceParameters
.getStringOrDefault(
Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY,
PipeSourceConstant.SOURCE_KEY),
BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
.toLowerCase();
- if
(pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
- ||
pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())) {
- throw new SemanticException(
- String.format(
- "Failed to alter pipe %s, in iotdb-source, password must be set
when the username is specified.",
- pipeName));
- }
+ return
pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ ||
pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName());
+ }
+
+ private static boolean containsPassword(final Map<String, String>
sourceOrSinkAttributes) {
+ final PipeParameters sourceOrSinkParameters = new
PipeParameters(sourceOrSinkAttributes);
+ return sourceOrSinkParameters.hasAnyAttributes(
+ PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY,
PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY);
}
- private static boolean onlyContainsUser(
- final Map<String, String> extractorOrConnectorAttributes) {
- final PipeParameters extractorOrConnectorParameters =
- new PipeParameters(extractorOrConnectorAttributes);
- return extractorOrConnectorParameters.hasAnyAttributes(
+ private static boolean onlyContainsUser(final Map<String, String>
sourceOrSinkAttributes) {
+ final PipeParameters sourceOrSinkParameters = new
PipeParameters(sourceOrSinkAttributes);
+ return sourceOrSinkParameters.hasAnyAttributes(
PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY,
PipeSinkConstant.SINK_IOTDB_USER_KEY,
PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY,
PipeSinkConstant.SINK_IOTDB_USERNAME_KEY)
- && !extractorOrConnectorParameters.hasAnyAttributes(
+ && !sourceOrSinkParameters.hasAnyAttributes(
PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY,
PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY);
}
- private static void checkSinkType(
- final String pipeName, final Map<String, String> connectorAttributes) {
- final PipeParameters connectorParameters = new
PipeParameters(connectorAttributes);
+ private static void checkSinkTypeWithException(
+ final String pipeName, final Map<String, String> sinkAttributes) {
+ if (checkSinkType(sinkAttributes)) {
+ throw new SemanticException(
+ String.format(
+ "Failed to alter pipe %s, in write-back-sink, password must be
set when the username is specified.",
+ pipeName));
+ }
+ }
+
+ private static boolean checkSinkType(final Map<String, String>
sinkAttributes) {
+ final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
final String pluginName =
- connectorParameters
+ sinkParameters
.getStringOrDefault(
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
.toLowerCase();
- if
(pluginName.equals(BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName())
- ||
pluginName.equals(BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName())) {
- throw new SemanticException(
- String.format(
- "Failed to alter pipe %s, in write-back-sink, password must be
set when the username is specified.",
- pipeName));
- }
+ return
pluginName.equals(BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName())
+ ||
pluginName.equals(BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
index aecadff7f81..9b5857d2635 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
@@ -75,25 +75,25 @@ public class AlterPipeTask implements IConfigTask {
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
// support now() function
- PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ PipeFunctionSupport.applyNowFunction2SourceAttributes(
attributes,
PipeSourceConstant.SOURCE_START_TIME_KEY,
PipeSourceConstant.EXTRACTOR_START_TIME_KEY,
currentTime);
- PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ PipeFunctionSupport.applyNowFunction2SourceAttributes(
attributes,
PipeSourceConstant.SOURCE_END_TIME_KEY,
PipeSourceConstant.EXTRACTOR_END_TIME_KEY,
currentTime);
- PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ PipeFunctionSupport.applyNowFunction2SourceAttributes(
attributes,
PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY,
PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY,
currentTime);
- PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ PipeFunctionSupport.applyNowFunction2SourceAttributes(
attributes,
PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY,
PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
index a5fbd36f88d..fc3884cb27e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
@@ -37,19 +37,19 @@ public class CreatePipeTask implements IConfigTask {
private final CreatePipeStatement createPipeStatement;
- public CreatePipeTask(CreatePipeStatement createPipeStatement) {
+ public CreatePipeTask(final CreatePipeStatement createPipeStatement) {
// support now() function
-
applyNowFunctionToExtractorAttributes(createPipeStatement.getSourceAttributes());
+
applyNowFunction2SourceAttributes(createPipeStatement.getSourceAttributes());
this.createPipeStatement = createPipeStatement;
}
- public CreatePipeTask(CreatePipe createPipe) {
+ public CreatePipeTask(final CreatePipe createPipe) {
createPipeStatement = new CreatePipeStatement(StatementType.CREATE_PIPE);
createPipeStatement.setPipeName(createPipe.getPipeName());
createPipeStatement.setIfNotExists(createPipe.hasIfNotExistsCondition());
// support now() function
- applyNowFunctionToExtractorAttributes(createPipe.getSourceAttributes());
+ applyNowFunction2SourceAttributes(createPipe.getSourceAttributes());
createPipeStatement.setSourceAttributes(createPipe.getSourceAttributes());
createPipeStatement.setProcessorAttributes(createPipe.getProcessorAttributes());
@@ -57,37 +57,37 @@ public class CreatePipeTask implements IConfigTask {
}
@Override
- public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
return configTaskExecutor.createPipe(createPipeStatement);
}
- private void applyNowFunctionToExtractorAttributes(final Map<String, String>
attributes) {
+ private void applyNowFunction2SourceAttributes(final Map<String, String>
attributes) {
final long currentTime =
CommonDateTimeUtils.convertMilliTimeWithPrecision(
System.currentTimeMillis(),
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
// support now() function
- PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ PipeFunctionSupport.applyNowFunction2SourceAttributes(
attributes,
PipeSourceConstant.SOURCE_START_TIME_KEY,
PipeSourceConstant.EXTRACTOR_START_TIME_KEY,
currentTime);
- PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ PipeFunctionSupport.applyNowFunction2SourceAttributes(
attributes,
PipeSourceConstant.SOURCE_END_TIME_KEY,
PipeSourceConstant.EXTRACTOR_END_TIME_KEY,
currentTime);
- PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ PipeFunctionSupport.applyNowFunction2SourceAttributes(
attributes,
PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY,
PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY,
currentTime);
- PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ PipeFunctionSupport.applyNowFunction2SourceAttributes(
attributes,
PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY,
PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
index 0edc18c3f00..e9fb09e8750 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
@@ -31,7 +31,7 @@ public class PipeFunctionSupport {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeFunctionSupport.class);
- public static void applyNowFunctionToExtractorAttributes(
+ public static void applyNowFunction2SourceAttributes(
final Map<String, String> extractorAttributes,
final String sourceKey,
final String extractorKey,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java
index 187575c58da..6ee378282ad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DataNodeAuthUtils.java
@@ -275,7 +275,8 @@ public class DataNodeAuthUtils {
* @return the timestamp when the password will expire. Long.MAX if the
password never expires.
* Null if the password history cannot be found.
*/
- public static Long checkPasswordExpiration(long userId, String password) {
+ public static Long checkPasswordExpiration(
+ final long userId, final String password, final boolean
useEncryptedPassword) {
if (userId == -1) {
return null;
}
@@ -335,7 +336,8 @@ public class DataNodeAuthUtils {
CommonDateTimeUtils.convertIoTDBTimeToMillis(tsBlock.getTimeByIndex(0));
// columns of last query: [timeseriesName, value, dataType]
String oldPassword = tsBlock.getColumn(1).getBinary(0).toString();
- if (oldPassword.equals(AuthUtils.encryptPassword(password))) {
+ if (oldPassword.equals(
+ useEncryptedPassword ? password :
AuthUtils.encryptPassword(password))) {
if (lastPasswordTime + passwordExpirationDays * 1000 * 86400 <=
lastPasswordTime) {
// overflow or passwordExpirationDays <= 0
return Long.MAX_VALUE;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java
index 9d0acaf14d8..194f33e8d67 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/LocalFileAuthorizerTest.java
@@ -64,8 +64,8 @@ public class LocalFileAuthorizerTest {
@Test
public void testLogin() throws AuthException {
- Assert.assertTrue(authorizer.login("root", "root"));
- Assert.assertThrows(AuthException.class, () -> authorizer.login("root",
"error"));
+ Assert.assertTrue(authorizer.login("root", "root", false));
+ Assert.assertThrows(AuthException.class, () -> authorizer.login("root",
"error", false));
}
@Test
@@ -76,7 +76,7 @@ public class LocalFileAuthorizerTest {
} catch (AuthException e) {
assertEquals("User user already exists", e.getMessage());
}
- Assert.assertTrue(authorizer.login(userName, password));
+ Assert.assertTrue(authorizer.login(userName, password, false));
authorizer.deleteUser(userName);
try {
authorizer.deleteUser(userName);
@@ -230,7 +230,7 @@ public class LocalFileAuthorizerTest {
public void testUpdatePassword() throws AuthException {
authorizer.createUser(userName, password);
authorizer.updateUserPassword(userName, "newPassword123456");
- Assert.assertTrue(authorizer.login(userName, "newPassword123456"));
+ Assert.assertTrue(authorizer.login(userName, "newPassword123456", false));
}
@Test
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java
index bd0c2d399e5..196cc80e5b6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/authorizer/OpenIdAuthorizerTest.java
@@ -60,7 +60,7 @@ public class OpenIdAuthorizerTest {
"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJxMS1XbTBvelE1TzBtUUg4LVNKYXAyWmNONE1tdWNXd25RV0tZeFpKNG93In0.eyJleHAiOjE1OTAzMTcxNzYsImlhdCI6MTU5MDMxNjg3NiwianRpIjoiY2MyNWQ3MDAtYjc5NC00OTA4LTg0OGUtOTRhNzYzNmM5YzQxIiwiaXNzIjoiaHR0cDovL2F1dGguZGVtby5wcmFnbWF0aWNpbmR1c3RyaWVzLmRlL2F1dGgvcmVhbG1zL0lvVERCIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6Ijg2YWRmNGIzLWE4ZTUtNDc1NC1iNWEwLTQ4OGI0OWY0M2VkMiIsInR5cCI6IkJlYXJlciIsImF6cCI6ImlvdGRiIiwic2Vzc2lvbl9zdGF0ZSI6Ijk0ZmI5NGZjLTg3YTMtNDg4Ny04M2Q3LWE
[...]
OpenIdAuthorizer authorizer = new
OpenIdAuthorizer(JSONObjectUtils.parse(OPEN_ID_PUBLIC_JWK));
- boolean login = authorizer.login(jwt, null);
+ boolean login = authorizer.login(jwt, null, false);
assertTrue(login);
}
@@ -99,14 +99,16 @@ public class OpenIdAuthorizerTest {
boolean login =
openIdAuthorizer.login(
"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJxMS1XbTBvelE1TzBtUUg4LVNKYXAyWmNONE1tdWNXd25RV0tZeFpKNG93In0.eyJleHAiOjE1OTAzMTcxNzYsImlhdCI6MTU5MDMxNjg3NiwianRpIjoiY2MyNWQ3MDAtYjc5NC00OTA4LTg0OGUtOTRhNzYzNmM5YzQxIiwiaXNzIjoiaHR0cDovL2F1dGguZGVtby5wcmFnbWF0aWNpbmR1c3RyaWVzLmRlL2F1dGgvcmVhbG1zL0lvVERCIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6Ijg2YWRmNGIzLWE4ZTUtNDc1NC1iNWEwLTQ4OGI0OWY0M2VkMiIsInR5cCI6IkJlYXJlciIsImF6cCI6ImlvdGRiIiwic2Vzc2lvbl9zdGF0ZSI6Ijk0ZmI5NGZjLTg3YTMtNDg4Ny04M2Q
[...]
- "");
+ "",
+ false);
assertTrue(login);
config.setOpenIdProviderUrl("https://auth.demo.pragmaticindustries.de/auth/realms/IoTDB/");
OpenIdAuthorizer openIdAuthorizer1 = new OpenIdAuthorizer();
login =
openIdAuthorizer1.login(
"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJxMS1XbTBvelE1TzBtUUg4LVNKYXAyWmNONE1tdWNXd25RV0tZeFpKNG93In0.eyJleHAiOjE1OTAzMTcxNzYsImlhdCI6MTU5MDMxNjg3NiwianRpIjoiY2MyNWQ3MDAtYjc5NC00OTA4LTg0OGUtOTRhNzYzNmM5YzQxIiwiaXNzIjoiaHR0cDovL2F1dGguZGVtby5wcmFnbWF0aWNpbmR1c3RyaWVzLmRlL2F1dGgvcmVhbG1zL0lvVERCIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6Ijg2YWRmNGIzLWE4ZTUtNDc1NC1iNWEwLTQ4OGI0OWY0M2VkMiIsInR5cCI6IkJlYXJlciIsImF6cCI6ImlvdGRiIiwic2Vzc2lvbl9zdGF0ZSI6Ijk0ZmI5NGZjLTg3YTMtNDg4Ny04M2Q
[...]
- "");
+ "",
+ false);
assertTrue(login);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java
index 98f70dc0ee6..6ba8c5336b1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/BasicAuthorizer.java
@@ -107,12 +107,17 @@ public abstract class BasicAuthorizer implements
IAuthorizer, IService {
}
@Override
- public boolean login(String username, String password) throws AuthException {
+ public boolean login(
+ final String username, final String password, final boolean
useEncryptedPassword)
+ throws AuthException {
User user = userManager.getEntity(username);
if (user == null || password == null) {
throw new AuthException(
TSStatusCode.USER_NOT_EXIST, String.format("The user %s does not
exist.", username));
}
+ if (useEncryptedPassword) {
+ return password.equals(user.getPassword());
+ }
if (AuthUtils.validatePassword(
password, user.getPassword(),
AsymmetricEncrypt.DigestAlgorithm.SHA_256)) {
return true;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java
index 445b29c0790..3ac33dbeddd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/IAuthorizer.java
@@ -42,7 +42,8 @@ public interface IAuthorizer extends SnapshotProcessor {
* @param password The password of the user.
* @return True if such user exists and the given password is correct, else
return false.
*/
- boolean login(String username, String password) throws AuthException;
+ boolean login(String username, String password, final boolean
useEncryptedPassword)
+ throws AuthException;
/**
* Login for a user in pipe.
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java
index 2da1acfaee9..ee66ee5bced 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/authorizer/OpenIdAuthorizer.java
@@ -145,7 +145,8 @@ public class OpenIdAuthorizer extends BasicAuthorizer {
}
@Override
- public boolean login(String token, String password) throws AuthException {
+ public boolean login(String token, String password, final boolean
useEncryptedPassword)
+ throws AuthException {
if (password != null && !password.isEmpty()) {
logger.error(
"JWT Login failed as a non-empty Password was given username
(token): {}", token);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index 519d263449b..45a1acbfed3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -87,28 +87,31 @@ public abstract class PipePluginAgent {
final Map<String, String> processorAttributes,
final Map<String, String> sinkAttributes)
throws Exception {
- validateSource(sourceAttributes);
+ validateSource(pipeName, sourceAttributes);
validateProcessor(processorAttributes);
validateSink(pipeName, sinkAttributes);
}
- protected PipeExtractor validateSource(final Map<String, String>
sourceAttributes)
- throws Exception {
+ protected PipeExtractor validateSource(
+ final String pipeName, final Map<String, String> sourceAttributes)
throws Exception {
final PipeParameters sourceParameters = new
PipeParameters(sourceAttributes);
- final PipeExtractor temporaryExtractor = reflectSource(sourceParameters);
+ final PipeExtractor temporarySource = reflectSource(sourceParameters);
try {
- temporaryExtractor.validate(new
PipeParameterValidator(sourceParameters));
+ temporarySource.validate(new PipeParameterValidator(sourceParameters));
+ temporarySource.customize(
+ sourceParameters,
+ new PipeTaskRuntimeConfiguration(new
PipeTaskTemporaryRuntimeEnvironment(pipeName)));
} finally {
try {
- temporaryExtractor.close();
- } catch (Exception e) {
+ temporarySource.close();
+ } catch (final Exception e) {
LOGGER.warn("Failed to close temporary source: {}", e.getMessage(), e);
}
}
- return temporaryExtractor;
+ return temporarySource;
}
- protected PipeProcessor validateProcessor(Map<String, String>
processorAttributes)
+ protected PipeProcessor validateProcessor(final Map<String, String>
processorAttributes)
throws Exception {
final PipeParameters processorParameters = new
PipeParameters(processorAttributes);
final PipeProcessor temporaryProcessor =
reflectProcessor(processorParameters);
@@ -117,15 +120,15 @@ public abstract class PipePluginAgent {
} finally {
try {
temporaryProcessor.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn("Failed to close temporary processor: {}", e.getMessage(),
e);
}
}
return temporaryProcessor;
}
- protected PipeConnector validateSink(String pipeName, Map<String, String>
sinkAttributes)
- throws Exception {
+ protected PipeConnector validateSink(
+ final String pipeName, final Map<String, String> sinkAttributes) throws
Exception {
final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
final PipeConnector temporarySink = reflectSink(sinkParameters);
try {
@@ -137,7 +140,7 @@ public abstract class PipePluginAgent {
} finally {
try {
temporarySink.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn("Failed to close temporary connector: {}", e.getMessage(),
e);
}
}
@@ -154,7 +157,7 @@ public abstract class PipePluginAgent {
* @throws PipeException if any exception occurs
*/
public final List<String> getSubProcessorNamesWithSpecifiedParent(
- Class<? extends PipeProcessor> parentClass) throws PipeException {
+ final Class<? extends PipeProcessor> parentClass) throws PipeException {
return
StreamSupport.stream(pipePluginMetaKeeper.getAllPipePluginMeta().spliterator(),
false)
.map(pipePluginMeta -> pipePluginMeta.getPluginName().toLowerCase())
.filter(
@@ -162,7 +165,7 @@ public abstract class PipePluginAgent {
try (PipeProcessor processor =
(PipeProcessor)
pipeProcessorConstructor.reflectPluginByKey(pluginName)) {
return processor.getClass().getSuperclass() == parentClass;
- } catch (Exception e) {
+ } catch (final Exception e) {
return false;
}
})
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
index f16c6387cf5..e0868156ca9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
@@ -98,7 +98,7 @@ public abstract class PipeSubtaskExecutor {
public final synchronized void register(final PipeSubtask subtask) {
if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) {
- LOGGER.warn("The subtask {} is already registered.",
subtask.getTaskID());
+ LOGGER.warn("The subtask {} is already registered.",
getSafeSubtaskStr(subtask.getTaskID()));
return;
}
@@ -107,32 +107,36 @@ public abstract class PipeSubtaskExecutor {
subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor,
schedulerSupplier(this));
}
+ private static String getSafeSubtaskStr(final String subtaskID) {
+ return subtaskID.replaceAll("password=[^,}]*", "password=******");
+ }
+
protected PipeSubtaskScheduler schedulerSupplier(final PipeSubtaskExecutor
executor) {
return new PipeSubtaskScheduler(executor);
}
public final synchronized void start(final String subTaskID) {
if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
- LOGGER.warn("The subtask {} is not registered.", subTaskID);
+ LOGGER.warn("The subtask {} is not registered.",
getSafeSubtaskStr(subTaskID));
return;
}
final PipeSubtask subtask = registeredIdSubtaskMapper.get(subTaskID);
if (subtask.isSubmittingSelf()) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("The subtask {} is already running.", subTaskID);
+ LOGGER.debug("The subtask {} is already running.",
getSafeSubtaskStr(subTaskID));
}
} else {
subtask.allowSubmittingSelf();
subtask.submitSelf();
++runningSubtaskNumber;
- LOGGER.info("The subtask {} is started to submit self.", subTaskID);
+ LOGGER.info("The subtask {} is started to submit self.",
getSafeSubtaskStr(subTaskID));
}
}
public final synchronized void stop(final String subTaskID) {
if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
- LOGGER.warn("The subtask {} is not registered.", subTaskID);
+ LOGGER.warn("The subtask {} is not registered.",
getSafeSubtaskStr(subTaskID));
return;
}
@@ -149,9 +153,9 @@ public abstract class PipeSubtaskExecutor {
if (subtask != null) {
try {
subtask.close();
- LOGGER.info("The subtask {} is closed successfully.", subTaskID);
+ LOGGER.info("The subtask {} is closed successfully.",
getSafeSubtaskStr(subTaskID));
} catch (final Exception e) {
- LOGGER.error("Failed to close the subtask {}.", subTaskID, e);
+ LOGGER.error("Failed to close the subtask {}.",
getSafeSubtaskStr(subTaskID), e);
}
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
index 095bd243073..04cf7f27bd2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
@@ -291,6 +291,9 @@ public abstract class IoTDBNonDataRegionSource extends
IoTDBSource {
//////////////////////////// APIs provided for metric framework
////////////////////////////
public long getUnTransferredEventCount() {
+ if (Objects.isNull(pipeTaskMeta)) {
+ return 0L;
+ }
return !(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)
? getListeningQueue().getTailIndex()
- ((MetaProgressIndex) pipeTaskMeta.getProgressIndex()).getIndex()
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
index 91a149c9f66..33acc06123e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
@@ -30,20 +30,25 @@ import org.apache.iotdb.pipe.api.PipeExtractor;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import javax.annotation.Nonnull;
+
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_SKIP_IF_NO_PRIVILEGES;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_SKIP_IF_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_IOTDB_USER_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_SKIP_IF_KEY;
@@ -154,13 +159,14 @@ public abstract class IoTDBSource implements
PipeExtractor {
public void customize(
final PipeParameters parameters, final PipeExtractorRuntimeConfiguration
configuration)
throws Exception {
- final PipeTaskSourceRuntimeEnvironment environment =
- ((PipeTaskSourceRuntimeEnvironment)
configuration.getRuntimeEnvironment());
+ final PipeRuntimeEnvironment environment =
configuration.getRuntimeEnvironment();
regionId = environment.getRegionId();
pipeName = environment.getPipeName();
creationTime = environment.getCreationTime();
taskID = pipeName + "_" + regionId + "_" + creationTime;
- pipeTaskMeta = environment.getPipeTaskMeta();
+ if (environment instanceof PipeTaskSourceRuntimeEnvironment) {
+ pipeTaskMeta = ((PipeTaskSourceRuntimeEnvironment)
environment).getPipeTaskMeta();
+ }
final boolean isDoubleLiving =
parameters.getBooleanOrDefault(
@@ -199,8 +205,15 @@ public abstract class IoTDBSource implements PipeExtractor
{
userEntity.setAuditLogOperation(AuditLogOperation.QUERY);
skipIfNoPrivileges = getSkipIfNoPrivileges(parameters);
+ final String password =
+ parameters.getStringByKeys(EXTRACTOR_IOTDB_PASSWORD_KEY,
SOURCE_IOTDB_PASSWORD_KEY);
+ if (Objects.nonNull(password)) {
+ login(password);
+ }
}
+ protected abstract void login(final @Nonnull String password);
+
public static boolean getSkipIfNoPrivileges(final PipeParameters
extractorParameters) {
final String extractorSkipIfValue =
extractorParameters
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index b6569d943fb..92312ee81a3 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -445,6 +445,7 @@ struct TAuthizedPatternTreeResp {
struct TLoginReq {
1: required string userrname
2: required string password
+ 3: optional bool useEncryptedPassword
}
// reqtype : tree, relational, system