This is an automated email from the ASF dual-hosted git repository.
neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5d48b1ecd8 [IOTDB-3425] [IOTDB-3415] add permission operate to
LocalConfigNode && Abstract an interface for Standalone Authority Check (#6196)
5d48b1ecd8 is described below
commit 5d48b1ecd8f71b17005cce63a0e6e678af82b035
Author: 任宇华 <[email protected]>
AuthorDate: Thu Jun 9 16:52:14 2022 +0800
[IOTDB-3425] [IOTDB-3415] add permission operate to LocalConfigNode &&
Abstract an interface for Standalone Authority Check (#6196)
---
.../iotdb/confignode/persistence/AuthorInfo.java | 1 +
.../org/apache/iotdb/db/auth/AuthorityChecker.java | 8 +-
.../org/apache/iotdb/db/auth/AuthorityFetcher.java | 182 -----------
.../apache/iotdb/db/auth/AuthorizerManager.java | 225 +++-----------
.../iotdb/db/auth/ClusterAuthorityFetcher.java | 341 +++++++++++++++++++++
.../apache/iotdb/db/auth/IAuthorityFetcher.java | 42 +++
.../iotdb/db/auth/StandaloneAuthorityFetcher.java | 133 ++++++++
.../iotdb/db/localconfignode/LocalConfigNode.java | 234 ++++++++++++++
...thorizerConfigTask.java => AuthorizerTask.java} | 42 ++-
.../plan/execution/config/ConfigTaskVisitor.java | 2 +-
.../iotdb/db/auth/AuthorizerManagerTest.java | 38 ++-
.../db/mpp/plan/StandaloneCoordinatorTest.java | 8 +
12 files changed, 855 insertions(+), 401 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
index 35a1e12e97..6fa365235d 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
@@ -104,6 +104,7 @@ public class AuthorInfo implements SnapshotProcessor {
for (String path : paths) {
if (!checkOnePath(username, path, permission)) {
status = false;
+ break;
}
}
} catch (AuthException e) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index 94c6a10001..122304698c 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.rpc.ConfigNodeConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -104,8 +103,7 @@ public class AuthorityChecker {
* @return if permission-check is passed
*/
public static boolean checkPermission(
- String username, List<? extends PartialPath> paths, StatementType type,
String targetUser)
- throws AuthException, ConfigNodeConnectionException {
+ String username, List<? extends PartialPath> paths, StatementType type,
String targetUser) {
if (SUPER_USER.equals(username)) {
return true;
}
@@ -128,7 +126,7 @@ public class AuthorityChecker {
allPath.add(AuthUtils.ROOT_PATH_PRIVILEGE);
}
- TSStatus status = authorizerManager.checkPermissionCache(username,
allPath, permission);
+ TSStatus status = authorizerManager.checkPath(username, allPath,
permission);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return true;
} else {
@@ -170,7 +168,7 @@ public class AuthorityChecker {
/** Check whether specific user has the authorization to given plan. */
public static boolean checkAuthorization(Statement statement, String
username)
- throws AuthException, ConfigNodeConnectionException {
+ throws AuthException {
if (!statement.isAuthenticationRequired()) {
return true;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityFetcher.java
b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityFetcher.java
deleted file mode 100644
index 78d92bb33e..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityFetcher.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.auth;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.consensus.PartitionRegionId;
-import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
-import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
-import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
-import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
-import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.db.client.ConfigNodeClient;
-import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
-import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
-import org.apache.iotdb.db.mpp.plan.execution.config.AuthorizerConfigTask;
-import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
-import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
-import org.apache.iotdb.rpc.ConfigNodeConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.utils.Binary;
-
-import com.google.common.util.concurrent.SettableFuture;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-public class AuthorityFetcher {
-
- private static final Logger logger =
LoggerFactory.getLogger(AuthorizerConfigTask.class);
-
- private static final IClientManager<PartitionRegionId, ConfigNodeClient>
configNodeClientManager =
- new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
- .createClientManager(new
DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
-
- public static TPermissionInfoResp checkPath(String username, List<String>
allPath, int permission)
- throws ConfigNodeConnectionException {
- TCheckUserPrivilegesReq req = new TCheckUserPrivilegesReq(username,
allPath, permission);
- TPermissionInfoResp status = null;
- try (ConfigNodeClient configNodeClient =
-
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- // Send request to some API server
- status = configNodeClient.checkUserPrivileges(req);
- } catch (TException | IOException e) {
- throw new ConfigNodeConnectionException("Couldn't connect config node");
- } finally {
- if (status == null) {
- status = new TPermissionInfoResp();
- }
- }
- return status;
- }
-
- /** Check the user */
- public static TPermissionInfoResp checkUser(String username, String password)
- throws ConfigNodeConnectionException {
- TLoginReq req = new TLoginReq(username, password);
- TPermissionInfoResp status = null;
- try (ConfigNodeClient configNodeClient =
-
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- // Send request to some API server
- status = configNodeClient.login(req);
- } catch (TException | IOException e) {
- throw new ConfigNodeConnectionException("Couldn't connect config node");
- } finally {
- if (status == null) {
- status = new TPermissionInfoResp();
- }
- }
- return status;
- }
-
- public static SettableFuture<ConfigTaskResult> operatePermission(
- TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
- SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- try {
- // Send request to some API server
- TSStatus tsStatus = configNodeClient.operatePermission(authorizerReq);
- // Get response or throw exception
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
- logger.error(
- "Failed to execute {} in config node, status is {}.",
- AuthorOperator.AuthorType.values()[authorizerReq.getAuthorType()]
- .toString()
- .toLowerCase(Locale.ROOT),
- tsStatus);
- future.setException(new StatementExecutionException(tsStatus));
- } else {
- future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
- }
- } catch (TException e) {
- logger.error("Failed to connect to config node.");
- future.setException(e);
- }
- // If the action is executed successfully, return the Future.
- // If your operation is async, you can return the corresponding future
directly.
- return future;
- }
-
- public static SettableFuture<ConfigTaskResult> queryPermission(
- TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
- SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- TAuthorizerResp authorizerResp;
- try {
- // Send request to some API server
- authorizerResp = configNodeClient.queryPermission(authorizerReq);
- // Get response or throw exception
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
authorizerResp.getStatus().getCode()) {
- logger.error(
- "Failed to execute {} in config node, status is {}.",
- AuthorOperator.AuthorType.values()[authorizerReq.getAuthorType()]
- .toString()
- .toLowerCase(Locale.ROOT),
- authorizerResp.getStatus());
- future.setException(new
StatementExecutionException(authorizerResp.getStatus()));
- } else {
- // build TSBlock
- List<TSDataType> types = new ArrayList<>();
- Map<String, List<String>> authorizerInfo =
authorizerResp.getAuthorizerInfo();
- for (int i = 0; i < authorizerInfo.size(); i++) {
- types.add(TSDataType.TEXT);
- }
- TsBlockBuilder builder = new TsBlockBuilder(types);
- List<ColumnHeader> headerList = new ArrayList<>();
-
- for (String header : authorizerInfo.keySet()) {
- headerList.add(new ColumnHeader(header, TSDataType.TEXT));
- }
- // The Time column will be ignored by the setting of ColumnHeader.
- // So we can put a meaningless value here
- for (String value :
authorizerInfo.get(headerList.get(0).getColumnName())) {
- builder.getTimeColumnBuilder().writeLong(0L);
- builder.getColumnBuilder(0).writeBinary(new Binary(value));
- builder.declarePosition();
- }
- for (int i = 1; i < headerList.size(); i++) {
- for (String value :
authorizerInfo.get(headerList.get(i).getColumnName())) {
- builder.getColumnBuilder(i).writeBinary(new Binary(value));
- }
- }
-
- DatasetHeader datasetHeader = new DatasetHeader(headerList, true);
- future.set(
- new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(),
datasetHeader));
- }
- } catch (TException e) {
- logger.error("Failed to connect to config node.");
- future.setException(e);
- }
- // If the action is executed successfully, return the Future.
- // If your operation is async, you can return the corresponding future
directly.
- return future;
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/auth/AuthorizerManager.java
b/server/src/main/java/org/apache/iotdb/db/auth/AuthorizerManager.java
index 5568a018a7..5d42da83a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/AuthorizerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorizerManager.java
@@ -23,21 +23,20 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.auth.authorizer.BasicAuthorizer;
import org.apache.iotdb.commons.auth.authorizer.IAuthorizer;
-import org.apache.iotdb.commons.auth.entity.PathPrivilege;
import org.apache.iotdb.commons.auth.entity.Role;
import org.apache.iotdb.commons.auth.entity.User;
-import org.apache.iotdb.commons.utils.AuthUtils;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
-import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.rpc.ConfigNodeConnectionException;
-import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -46,11 +45,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class AuthorizerManager implements IAuthorizer {
@@ -60,23 +57,17 @@ public class AuthorizerManager implements IAuthorizer {
private IAuthorizer iAuthorizer;
private ReentrantReadWriteLock authReadWriteLock;
private IoTDBDescriptor conf = IoTDBDescriptor.getInstance();
-
- private Cache<String, User> userCache =
- Caffeine.newBuilder()
- .maximumSize(conf.getConfig().getAuthorCacheSize())
- .expireAfterAccess(conf.getConfig().getAuthorCacheExpireTime(),
TimeUnit.MINUTES)
- .build();
-
- private Cache<String, Role> roleCache =
- Caffeine.newBuilder()
- .maximumSize(conf.getConfig().getAuthorCacheSize())
- .expireAfterAccess(conf.getConfig().getAuthorCacheExpireTime(),
TimeUnit.MINUTES)
- .build();
+ private IAuthorityFetcher authorityFetcher;
public AuthorizerManager() {
try {
iAuthorizer = BasicAuthorizer.getInstance();
authReadWriteLock = new ReentrantReadWriteLock();
+ if (conf.getConfig().isClusterMode()) {
+ authorityFetcher = ClusterAuthorityFetcher.getInstance();
+ } else {
+ authorityFetcher = StandaloneAuthorityFetcher.getInstance();
+ }
} catch (AuthException e) {
logger.error(e.getMessage());
}
@@ -373,103 +364,35 @@ public class AuthorizerManager implements IAuthorizer {
}
}
- public TSStatus checkPermissionCache(String username, List<String> allPath,
int permission)
- throws AuthException, ConfigNodeConnectionException {
+ /** Check the path */
+ public TSStatus checkPath(String username, List<String> allPath, int
permission) {
authReadWriteLock.readLock().lock();
try {
- User user = userCache.getIfPresent(username);
- if (user != null) {
- for (String path : allPath) {
- if (!user.checkPrivilege(path, permission)) {
- if (user.getRoleList().isEmpty()) {
- return RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR);
- }
- boolean status = false;
- for (String roleName : user.getRoleList()) {
- Role role = roleCache.getIfPresent(roleName);
- // It is detected that the role of the user does not exist in
the cache, indicating
- // that the permission information of the role has changed.
- // The user cache needs to be initialized
- if (role == null) {
- invalidateCache(username, "");
- return checkPath(username, allPath, permission);
- }
- status = role.checkPrivilege(path, permission);
- if (status) {
- break;
- }
- }
- if (!status) {
- return RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR);
- }
- }
- }
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } else {
- return checkPath(username, allPath, permission);
- }
+ return authorityFetcher.checkUserPrivileges(username, allPath,
permission);
} finally {
authReadWriteLock.readLock().unlock();
}
}
- public TSStatus checkPath(String username, List<String> allPath, int
permission)
- throws ConfigNodeConnectionException {
- TPermissionInfoResp tPermissionInfoResp =
- AuthorityFetcher.checkPath(username, allPath, permission);
- if (tPermissionInfoResp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- userCache.put(username, cacheUser(tPermissionInfoResp));
- return tPermissionInfoResp.getStatus();
- } else {
- return tPermissionInfoResp.getStatus();
- }
- }
-
/** Check the user */
public TSStatus checkUser(String username, String password) throws
ConfigNodeConnectionException {
- // TODO:(Local AuthCheck)
- if (!conf.getConfig().isClusterMode()) {
- try {
- if (login(username, password)) {
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } else {
- return RpcUtils.getStatus(
- TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR, "Authentication
failed.");
- }
- } catch (AuthException e) {
- return RpcUtils.getStatus(TSStatusCode.AUTHENTICATION_ERROR,
e.getMessage());
- }
- }
authReadWriteLock.readLock().lock();
try {
- User user = userCache.getIfPresent(username);
- if (user != null) {
- if (password != null && AuthUtils.validatePassword(password,
user.getPassword())) {
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } else {
- return RpcUtils.getStatus(
- TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR, "Authentication
failed.");
- }
- } else {
- TPermissionInfoResp tPermissionInfoResp =
AuthorityFetcher.checkUser(username, password);
- if (tPermissionInfoResp.getStatus().getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- userCache.put(username, cacheUser(tPermissionInfoResp));
- return tPermissionInfoResp.getStatus();
- } else {
- return tPermissionInfoResp.getStatus();
- }
- }
+ return authorityFetcher.checkUser(username, password);
} finally {
authReadWriteLock.readLock().unlock();
}
}
+ public boolean invalidateCache(String username, String roleName) {
+ return ClusterAuthorityFetcher.getInstance().invalidateCache(username,
roleName);
+ }
+
public SettableFuture<ConfigTaskResult> queryPermission(
- TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
+ TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) throws
TException {
authReadWriteLock.readLock().lock();
try {
- return AuthorityFetcher.queryPermission(authorizerReq, configNodeClient);
+ return authorityFetcher.queryPermission(authorizerReq, configNodeClient);
} finally {
authReadWriteLock.readLock().unlock();
}
@@ -479,96 +402,40 @@ public class AuthorizerManager implements IAuthorizer {
TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
authReadWriteLock.writeLock().lock();
try {
- return AuthorityFetcher.operatePermission(authorizerReq,
configNodeClient);
+ return authorityFetcher.operatePermission(authorizerReq,
configNodeClient);
} finally {
authReadWriteLock.writeLock().unlock();
}
}
- /** cache user */
- public User cacheUser(TPermissionInfoResp tPermissionInfoResp) {
- User user = new User();
- List<String> privilegeList =
tPermissionInfoResp.getUserInfo().getPrivilegeList();
- List<PathPrivilege> pathPrivilegeList = new ArrayList<>();
- user.setName(tPermissionInfoResp.getUserInfo().getUsername());
- user.setPassword(tPermissionInfoResp.getUserInfo().getPassword());
- for (int i = 0; i < privilegeList.size(); i++) {
- String path = privilegeList.get(i);
- String privilege = privilegeList.get(++i);
- pathPrivilegeList.add(toPathPrivilege(path, privilege));
- }
- user.setPrivilegeList(pathPrivilegeList);
- user.setRoleList(tPermissionInfoResp.getUserInfo().getRoleList());
- for (String roleName : tPermissionInfoResp.getRoleInfo().keySet()) {
- roleCache.put(roleName, cacheRole(roleName, tPermissionInfoResp));
- }
- return user;
- }
-
- /** cache role */
- public Role cacheRole(String roleName, TPermissionInfoResp
tPermissionInfoResp) {
- Role role = new Role();
- List<String> privilegeList =
tPermissionInfoResp.getRoleInfo().get(roleName).getPrivilegeList();
- List<PathPrivilege> pathPrivilegeList = new ArrayList<>();
-
role.setName(tPermissionInfoResp.getRoleInfo().get(roleName).getRoleName());
- for (int i = 0; i < privilegeList.size(); i++) {
- String path = privilegeList.get(i);
- String privilege = privilegeList.get(++i);
- pathPrivilegeList.add(toPathPrivilege(path, privilege));
- }
- role.setPrivilegeList(pathPrivilegeList);
- return role;
- }
-
- /**
- * Initialize user and role cache information.
- *
- * <p>If the permission information of the role changes, only the role cache
information is
- * cleared. During permission checking, if the role belongs to a user, the
user will be
- * initialized.
- */
- public boolean invalidateCache(String username, String roleName) {
- if (userCache.getIfPresent(username) != null) {
- List<String> roleList = userCache.getIfPresent(username).getRoleList();
- if (!roleList.isEmpty()) {
- roleCache.invalidateAll(roleList);
- }
- userCache.invalidate(username);
+ /** build TSBlock */
+ public SettableFuture<ConfigTaskResult> buildTSBlock(Map<String,
List<String>> authorizerInfo) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ List<TSDataType> types = new ArrayList<>();
+ for (int i = 0; i < authorizerInfo.size(); i++) {
+ types.add(TSDataType.TEXT);
}
- if (roleCache.getIfPresent(roleName) != null) {
- roleCache.invalidate(roleName);
+ TsBlockBuilder builder = new TsBlockBuilder(types);
+ List<ColumnHeader> headerList = new ArrayList<>();
+
+ for (String header : authorizerInfo.keySet()) {
+ headerList.add(new ColumnHeader(header, TSDataType.TEXT));
}
- if (userCache.getIfPresent(username) != null &&
roleCache.getIfPresent(roleName) != null) {
- logger.error("datanode cache initialization failed");
- return false;
+ // The Time column will be ignored by the setting of ColumnHeader.
+ // So we can put a meaningless value here
+ for (String value : authorizerInfo.get(headerList.get(0).getColumnName()))
{
+ builder.getTimeColumnBuilder().writeLong(0L);
+ builder.getColumnBuilder(0).writeBinary(new Binary(value));
+ builder.declarePosition();
}
- return true;
- }
-
- /**
- * Convert user privilege information obtained from confignode to
PathPrivilege
- *
- * @param path permission path
- * @param privilege privilegeIds
- * @return
- */
- private PathPrivilege toPathPrivilege(String path, String privilege) {
- PathPrivilege pathPrivilege = new PathPrivilege();
- String[] privileges = privilege.replace(" ", "").split(",");
- Set<Integer> privilegeIds = new HashSet<>();
- for (String p : privileges) {
- privilegeIds.add(Integer.parseInt(p));
+ for (int i = 1; i < headerList.size(); i++) {
+ for (String value :
authorizerInfo.get(headerList.get(i).getColumnName())) {
+ builder.getColumnBuilder(i).writeBinary(new Binary(value));
+ }
}
- pathPrivilege.setPrivileges(privilegeIds);
- pathPrivilege.setPath(path);
- return pathPrivilege;
- }
-
- public Cache<String, User> getUserCache() {
- return userCache;
- }
- public Cache<String, Role> getRoleCache() {
- return roleCache;
+ DatasetHeader datasetHeader = new DatasetHeader(headerList, true);
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS,
builder.build(), datasetHeader));
+ return future;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
new file mode 100644
index 0000000000..0da77fde91
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.auth;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.auth.AuthException;
+import org.apache.iotdb.commons.auth.entity.PathPrivilege;
+import org.apache.iotdb.commons.auth.entity.Role;
+import org.apache.iotdb.commons.auth.entity.User;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.PartitionRegionId;
+import org.apache.iotdb.commons.utils.AuthUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
+import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class ClusterAuthorityFetcher implements IAuthorityFetcher {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterAuthorityFetcher.class);
+
+ private IoTDBDescriptor conf = IoTDBDescriptor.getInstance();
+
+ private Cache<String, User> userCache =
+ Caffeine.newBuilder()
+ .maximumSize(conf.getConfig().getAuthorCacheSize())
+ .expireAfterAccess(conf.getConfig().getAuthorCacheExpireTime(),
TimeUnit.MINUTES)
+ .build();
+
+ private Cache<String, Role> roleCache =
+ Caffeine.newBuilder()
+ .maximumSize(conf.getConfig().getAuthorCacheSize())
+ .expireAfterAccess(conf.getConfig().getAuthorCacheExpireTime(),
TimeUnit.MINUTES)
+ .build();
+
+ private static final IClientManager<PartitionRegionId, ConfigNodeClient>
configNodeClientManager =
+ new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
+ .createClientManager(new
DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+
+ private static final class ClusterAuthorityFetcherHolder {
+ private static final ClusterAuthorityFetcher INSTANCE = new
ClusterAuthorityFetcher();
+
+ private ClusterAuthorityFetcherHolder() {}
+ }
+
+ public static ClusterAuthorityFetcher getInstance() {
+ return ClusterAuthorityFetcher.ClusterAuthorityFetcherHolder.INSTANCE;
+ }
+
+ @Override
+ public TSStatus checkUserPrivileges(String username, List<String> allPath,
int permission) {
+ User user = userCache.getIfPresent(username);
+ if (user != null) {
+ for (String path : allPath) {
+ try {
+ if (!user.checkPrivilege(path, permission)) {
+ if (user.getRoleList().isEmpty()) {
+ return RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR);
+ }
+ boolean status = false;
+ for (String roleName : user.getRoleList()) {
+ Role role = roleCache.getIfPresent(roleName);
+ // It is detected that the role of the user does not exist in
the cache, indicating
+ // that the permission information of the role has changed.
+ // The user cache needs to be initialized
+ if (role == null) {
+ invalidateCache(username, "");
+ return checkPath(username, allPath, permission);
+ }
+ status = role.checkPrivilege(path, permission);
+ if (status) {
+ break;
+ }
+ }
+ if (!status) {
+ return RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR);
+ }
+ }
+ } catch (AuthException e) {
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ } else {
+ return checkPath(username, allPath, permission);
+ }
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> operatePermission(
+ TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+ // Send request to some API server
+ TSStatus tsStatus = configNodeClient.operatePermission(authorizerReq);
+ // Get response or throw exception
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ logger.error(
+ "Failed to execute {} in config node, status is {}.",
+ AuthorOperator.AuthorType.values()[authorizerReq.getAuthorType()]
+ .toString()
+ .toLowerCase(Locale.ROOT),
+ tsStatus);
+ future.setException(new StatementExecutionException(tsStatus));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (TException e) {
+ logger.error("Failed to connect to config node.");
+ future.setException(e);
+ }
+ // If the action is executed successfully, return the Future.
+ // If your operation is async, you can return the corresponding future
directly.
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> queryPermission(
+ TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ TAuthorizerResp authorizerResp = new TAuthorizerResp();
+ try {
+ // Send request to some API server
+ authorizerResp = configNodeClient.queryPermission(authorizerReq);
+ // Get response or throw exception
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
authorizerResp.getStatus().getCode()) {
+ logger.error(
+ "Failed to execute {} in config node, status is {}.",
+ AuthorOperator.AuthorType.values()[authorizerReq.getAuthorType()]
+ .toString()
+ .toLowerCase(Locale.ROOT),
+ authorizerResp.getStatus());
+ future.setException(new
StatementExecutionException(authorizerResp.getStatus()));
+ } else {
+ future =
AuthorizerManager.getInstance().buildTSBlock(authorizerResp.getAuthorizerInfo());
+ }
+ } catch (TException e) {
+ logger.error("Failed to connect to config node.");
+ authorizerResp.setStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Failed to connect to
config node."));
+ }
+ return future;
+ }
+
+ @Override
+ public TSStatus checkUser(String username, String password) {
+ User user = userCache.getIfPresent(username);
+ if (user != null) {
+ if (password != null && AuthUtils.validatePassword(password,
user.getPassword())) {
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ } else {
+ return RpcUtils.getStatus(
+ TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR, "Authentication failed.");
+ }
+ } else {
+ TLoginReq req = new TLoginReq(username, password);
+ TPermissionInfoResp status = null;
+ try (ConfigNodeClient configNodeClient =
+
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ // Send request to some API server
+ status = configNodeClient.login(req);
+ } catch (TException | IOException e) {
+ logger.error("Failed to connect to config node.");
+ status = new TPermissionInfoResp();
+ status.setStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Failed to connect to
config node."));
+ } finally {
+ if (status == null) {
+ status = new TPermissionInfoResp();
+ }
+ }
+ if (status.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ userCache.put(username, cacheUser(status));
+ return status.getStatus();
+ } else {
+ return status.getStatus();
+ }
+ }
+ }
+
+ public TSStatus checkPath(String username, List<String> allPath, int
permission) {
+ TCheckUserPrivilegesReq req = new TCheckUserPrivilegesReq(username,
allPath, permission);
+ TPermissionInfoResp permissionInfoResp;
+ try (ConfigNodeClient configNodeClient =
+
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ // Send request to some API server
+ permissionInfoResp = configNodeClient.checkUserPrivileges(req);
+ } catch (TException | IOException e) {
+ logger.error("Failed to connect to config node.");
+ permissionInfoResp = new TPermissionInfoResp();
+ permissionInfoResp.setStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Failed to connect to
config node."));
+ }
+ if (permissionInfoResp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ userCache.put(username, cacheUser(permissionInfoResp));
+ return permissionInfoResp.getStatus();
+ } else {
+ return permissionInfoResp.getStatus();
+ }
+ }
+
+ /**
+ * Initialize user and role cache information.
+ *
+ * <p>If the permission information of the role changes, only the role cache
information is
+ * cleared. During permission checking, if the role belongs to a user, the
user will be
+ * initialized.
+ */
+ public boolean invalidateCache(String username, String roleName) {
+ if (username != null) {
+ if (userCache.getIfPresent(username) != null) {
+ List<String> roleList = userCache.getIfPresent(username).getRoleList();
+ if (!roleList.isEmpty()) {
+ roleCache.invalidateAll(roleList);
+ }
+ userCache.invalidate(username);
+ }
+ if (userCache.getIfPresent(username) != null) {
+ logger.error("datanode cache initialization failed");
+ return false;
+ }
+ }
+ if (roleName != null) {
+ if (roleCache.getIfPresent(roleName) != null) {
+ roleCache.invalidate(roleName);
+ }
+ if (roleCache.getIfPresent(roleName) != null) {
+ logger.error("datanode cache initialization failed");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /** cache user */
+ public User cacheUser(TPermissionInfoResp tPermissionInfoResp) {
+ User user = new User();
+ List<String> privilegeList =
tPermissionInfoResp.getUserInfo().getPrivilegeList();
+ List<PathPrivilege> pathPrivilegeList = new ArrayList<>();
+ user.setName(tPermissionInfoResp.getUserInfo().getUsername());
+ user.setPassword(tPermissionInfoResp.getUserInfo().getPassword());
+ for (int i = 0; i < privilegeList.size(); i++) {
+ String path = privilegeList.get(i);
+ String privilege = privilegeList.get(++i);
+ pathPrivilegeList.add(toPathPrivilege(path, privilege));
+ }
+ user.setPrivilegeList(pathPrivilegeList);
+ user.setRoleList(tPermissionInfoResp.getUserInfo().getRoleList());
+ for (String roleName : tPermissionInfoResp.getRoleInfo().keySet()) {
+ roleCache.put(roleName, cacheRole(roleName, tPermissionInfoResp));
+ }
+ return user;
+ }
+
+ /** cache role */
+ public Role cacheRole(String roleName, TPermissionInfoResp
tPermissionInfoResp) {
+ Role role = new Role();
+ List<String> privilegeList =
tPermissionInfoResp.getRoleInfo().get(roleName).getPrivilegeList();
+ List<PathPrivilege> pathPrivilegeList = new ArrayList<>();
+
role.setName(tPermissionInfoResp.getRoleInfo().get(roleName).getRoleName());
+ for (int i = 0; i < privilegeList.size(); i++) {
+ String path = privilegeList.get(i);
+ String privilege = privilegeList.get(++i);
+ pathPrivilegeList.add(toPathPrivilege(path, privilege));
+ }
+ role.setPrivilegeList(pathPrivilegeList);
+ return role;
+ }
+
+ /**
+ * Convert user privilege information obtained from confignode to
PathPrivilege
+ *
+ * @param path permission path
+ * @param privilege privilegeIds
+ * @return
+ */
+ private PathPrivilege toPathPrivilege(String path, String privilege) {
+ PathPrivilege pathPrivilege = new PathPrivilege();
+ String[] privileges = privilege.replace(" ", "").split(",");
+ Set<Integer> privilegeIds = new HashSet<>();
+ for (String p : privileges) {
+ privilegeIds.add(Integer.parseInt(p));
+ }
+ pathPrivilege.setPrivileges(privilegeIds);
+ pathPrivilege.setPath(path);
+ return pathPrivilege;
+ }
+
+ public Cache<String, User> getUserCache() {
+ return userCache;
+ }
+
+ public Cache<String, Role> getRoleCache() {
+ return roleCache;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java
b/server/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java
new file mode 100644
index 0000000000..fc618d1e1a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/auth/IAuthorityFetcher.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.auth;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.List;
+
+public interface IAuthorityFetcher {
+
+ TSStatus checkUser(String username, String password);
+
+ TSStatus checkUserPrivileges(String username, List<String> allPath, int
permission);
+
+ SettableFuture<ConfigTaskResult> operatePermission(
+ TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient);
+
+ SettableFuture<ConfigTaskResult> queryPermission(
+ TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient);
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/auth/StandaloneAuthorityFetcher.java
b/server/src/main/java/org/apache/iotdb/db/auth/StandaloneAuthorityFetcher.java
new file mode 100644
index 0000000000..d12d5c8329
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/auth/StandaloneAuthorityFetcher.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.auth;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.auth.AuthException;
+import org.apache.iotdb.commons.utils.AuthUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StandaloneAuthorityFetcher implements IAuthorityFetcher {
+
+ private static final Logger logger =
LoggerFactory.getLogger(StandaloneAuthorityFetcher.class);
+
+ private LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
+
+ private static final class StandaloneAuthorityFetcherHolder {
+ private static final StandaloneAuthorityFetcher INSTANCE = new
StandaloneAuthorityFetcher();
+
+ private StandaloneAuthorityFetcherHolder() {}
+ }
+
+ public static StandaloneAuthorityFetcher getInstance() {
+ return
StandaloneAuthorityFetcher.StandaloneAuthorityFetcherHolder.INSTANCE;
+ }
+
+ @Override
+ public TSStatus checkUser(String username, String password) {
+ try {
+ if (localConfigNode.login(username, password)) {
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ } else {
+ return RpcUtils.getStatus(
+ TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR, "Authentication failed.");
+ }
+ } catch (AuthException e) {
+ return RpcUtils.getStatus(TSStatusCode.AUTHENTICATION_ERROR,
e.getMessage());
+ }
+ }
+
+ @Override
+ public TSStatus checkUserPrivileges(String username, List<String> allPath,
int permission) {
+ boolean checkStatus = true;
+ String checkMessage = null;
+ for (String path : allPath) {
+ try {
+ if (!checkOnePath(username, path, permission)) {
+ checkStatus = false;
+ break;
+ }
+ } catch (AuthException e) {
+ checkStatus = false;
+ }
+ }
+ if (checkStatus) {
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ } else {
+ return RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR,
checkMessage);
+ }
+ }
+
+ private boolean checkOnePath(String username, String path, int permission)
throws AuthException {
+ try {
+ String fullPath = path == null ? AuthUtils.ROOT_PATH_PRIVILEGE : path;
+ if (localConfigNode.checkUserPrivileges(username, fullPath, permission))
{
+ return true;
+ }
+ } catch (AuthException e) {
+ logger.error("Error occurs when checking the seriesPath {} for user {}",
path, username, e);
+ throw new AuthException(e);
+ }
+ return false;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> operatePermission(
+ TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ boolean status = true;
+ try {
+ LocalConfigNode.getInstance().operatorPermission(authorizerReq);
+ } catch (AuthException e) {
+ future.setException(e);
+ status = false;
+ }
+ if (status) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> queryPermission(
+ TAuthorizerReq authorizerReq, ConfigNodeClient configNodeClient) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ Map<String, List<String>> authorizerResp = new HashMap<>();
+ try {
+ authorizerResp =
LocalConfigNode.getInstance().queryPermission(authorizerReq);
+ } catch (AuthException e) {
+ future.setException(e);
+ }
+ return AuthorizerManager.getInstance().buildTSBlock(authorizerResp);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index cd236234fe..3d25c73c25 100644
---
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -26,6 +26,13 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.auth.AuthException;
+import org.apache.iotdb.commons.auth.authorizer.BasicAuthorizer;
+import org.apache.iotdb.commons.auth.authorizer.IAuthorizer;
+import org.apache.iotdb.commons.auth.entity.PathPrivilege;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.auth.entity.Role;
+import org.apache.iotdb.commons.auth.entity.User;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -37,6 +44,8 @@ import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.AuthUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -59,6 +68,7 @@ import org.apache.iotdb.db.metadata.template.TemplateManager;
import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints;
+import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
@@ -119,6 +129,8 @@ public class LocalConfigNode {
SeriesPartitionExecutor.getSeriesPartitionExecutor(
config.getSeriesPartitionExecutorClass(),
config.getSeriesPartitionSlotNum());
+ private IAuthorizer iAuthorizer;
+
private LocalConfigNode() {
String schemaDir = config.getSchemaDir();
File schemaFolder = SystemFileFactory.INSTANCE.getFile(schemaDir);
@@ -129,6 +141,11 @@ public class LocalConfigNode {
logger.error("create system folder {} failed.",
schemaFolder.getAbsolutePath());
}
}
+ try {
+ iAuthorizer = BasicAuthorizer.getInstance();
+ } catch (AuthException e) {
+ logger.error(e.getMessage());
+ }
}
// region LocalSchemaConfigManager SingleTone
@@ -1009,4 +1026,221 @@ public class LocalConfigNode {
}
// endregion
+
+ // author
+ public void operatorPermission(TAuthorizerReq authorizerReq) throws
AuthException {
+ AuthorOperator.AuthorType authorType =
+ AuthorOperator.AuthorType.values()[authorizerReq.authorType];
+ String userName = authorizerReq.getUserName();
+ String roleName = authorizerReq.getRoleName();
+ String password = authorizerReq.getPassword();
+ String newPassword = authorizerReq.getNewPassword();
+ Set<Integer> permissions = authorizerReq.getPermissions();
+ String nodeName = authorizerReq.getNodeName();
+ switch (authorType) {
+ case UPDATE_USER:
+ iAuthorizer.updateUserPassword(userName, newPassword);
+ break;
+ case CREATE_USER:
+ iAuthorizer.createUser(userName, password);
+ break;
+ case CREATE_ROLE:
+ iAuthorizer.createRole(roleName);
+ break;
+ case DROP_USER:
+ iAuthorizer.deleteUser(userName);
+ break;
+ case DROP_ROLE:
+ iAuthorizer.deleteRole(roleName);
+ break;
+ case GRANT_ROLE:
+ for (int i : permissions) {
+ iAuthorizer.grantPrivilegeToRole(roleName, nodeName, i);
+ }
+ break;
+ case GRANT_USER:
+ for (int i : permissions) {
+ iAuthorizer.grantPrivilegeToUser(userName, nodeName, i);
+ }
+ break;
+ case GRANT_ROLE_TO_USER:
+ iAuthorizer.grantRoleToUser(roleName, userName);
+ break;
+ case REVOKE_USER:
+ for (int i : permissions) {
+ iAuthorizer.revokePrivilegeFromUser(userName, nodeName, i);
+ }
+ break;
+ case REVOKE_ROLE:
+ for (int i : permissions) {
+ iAuthorizer.revokePrivilegeFromRole(roleName, nodeName, i);
+ }
+ break;
+ case REVOKE_ROLE_FROM_USER:
+ iAuthorizer.revokeRoleFromUser(roleName, userName);
+ break;
+ default:
+ throw new AuthException("Unsupported operation " + authorType);
+ }
+ }
+
+ public Map<String, List<String>> queryPermission(TAuthorizerReq
authorizerReq)
+ throws AuthException {
+ AuthorOperator.AuthorType authorType =
+ AuthorOperator.AuthorType.values()[authorizerReq.authorType];
+ switch (authorType) {
+ case LIST_USER:
+ return executeListUser();
+ case LIST_ROLE:
+ return executeListRole();
+ case LIST_USER_PRIVILEGE:
+ return executeListUserPrivileges(authorizerReq);
+ case LIST_ROLE_PRIVILEGE:
+ return executeListRolePrivileges(authorizerReq);
+ case LIST_USER_ROLES:
+ return executeListUserRoles(authorizerReq);
+ case LIST_ROLE_USERS:
+ return executeListRoleUsers(authorizerReq);
+ default:
+ throw new AuthException("Unsupported operation " + authorType);
+ }
+ }
+
+ public Map<String, List<String>> executeListRole() {
+ List<String> roleList = iAuthorizer.listAllRoles();
+ Map<String, List<String>> permissionInfo = new HashMap<>();
+ permissionInfo.put(IoTDBConstant.COLUMN_ROLE, roleList);
+ return permissionInfo;
+ }
+
+ public Map<String, List<String>> executeListUser() {
+ List<String> userList = iAuthorizer.listAllUsers();
+ Map<String, List<String>> permissionInfo = new HashMap<>();
+ permissionInfo.put(IoTDBConstant.COLUMN_USER, userList);
+ return permissionInfo;
+ }
+
+ public Map<String, List<String>> executeListRoleUsers(TAuthorizerReq
authorizerReq)
+ throws AuthException {
+ Map<String, List<String>> permissionInfo = new HashMap<>();
+ Role role;
+ try {
+ role = iAuthorizer.getRole(authorizerReq.getRoleName());
+ if (role == null) {
+ throw new AuthException("No such role : " +
authorizerReq.getRoleName());
+ }
+ } catch (AuthException e) {
+ throw new AuthException(e);
+ }
+ List<String> roleUsersList = new ArrayList<>();
+ List<String> userList = iAuthorizer.listAllUsers();
+ for (String userN : userList) {
+ User userObj = iAuthorizer.getUser(userN);
+ if (userObj != null && userObj.hasRole(authorizerReq.getRoleName())) {
+ roleUsersList.add(userN);
+ }
+ }
+ permissionInfo.put(IoTDBConstant.COLUMN_USER, roleUsersList);
+ return permissionInfo;
+ }
+
+ public Map<String, List<String>> executeListUserRoles(TAuthorizerReq
authorizerReq)
+ throws AuthException {
+ Map<String, List<String>> permissionInfo = new HashMap<>();
+ User user;
+ try {
+ user = iAuthorizer.getUser(authorizerReq.getUserName());
+ if (user == null) {
+ throw new AuthException("No such user : " +
authorizerReq.getUserName());
+ }
+ } catch (AuthException e) {
+ throw new AuthException(e);
+ }
+ List<String> userRoleList = new ArrayList<>();
+ for (String roleN : user.getRoleList()) {
+ userRoleList.add(roleN);
+ }
+
+ permissionInfo.put(IoTDBConstant.COLUMN_ROLE, userRoleList);
+ return permissionInfo;
+ }
+
+ public Map<String, List<String>> executeListRolePrivileges(TAuthorizerReq
authorizerReq)
+ throws AuthException {
+ Map<String, List<String>> permissionInfo = new HashMap<>();
+ Role role;
+ try {
+ role = iAuthorizer.getRole(authorizerReq.getRoleName());
+ if (role == null) {
+ throw new AuthException("No such role : " +
authorizerReq.getRoleName());
+ }
+ } catch (AuthException e) {
+ throw new AuthException(e);
+ }
+ List<String> rolePrivilegesList = new ArrayList<>();
+ for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+ if (authorizerReq.getNodeName().equals("")
+ || AuthUtils.pathBelongsTo(authorizerReq.getNodeName(),
pathPrivilege.getPath())) {
+ rolePrivilegesList.add(pathPrivilege.toString());
+ }
+ }
+
+ permissionInfo.put(IoTDBConstant.COLUMN_PRIVILEGE, rolePrivilegesList);
+ return permissionInfo;
+ }
+
+ public Map<String, List<String>> executeListUserPrivileges(TAuthorizerReq
authorizerReq)
+ throws AuthException {
+ Map<String, List<String>> permissionInfo = new HashMap<>();
+ User user;
+ try {
+ user = iAuthorizer.getUser(authorizerReq.getUserName());
+ if (user == null) {
+ throw new AuthException("No such user : " +
authorizerReq.getUserName());
+ }
+ } catch (AuthException e) {
+ throw new AuthException(e);
+ }
+ List<String> userPrivilegesList = new ArrayList<>();
+
+ if (IoTDBConstant.PATH_ROOT.equals(authorizerReq.getUserName())) {
+ for (PrivilegeType privilegeType : PrivilegeType.values()) {
+ userPrivilegesList.add(privilegeType.toString());
+ }
+ } else {
+ List<String> rolePrivileges = new ArrayList<>();
+ for (PathPrivilege pathPrivilege : user.getPrivilegeList()) {
+ if (authorizerReq.getNodeName().equals("")
+ || AuthUtils.pathBelongsTo(authorizerReq.getNodeName(),
pathPrivilege.getPath())) {
+ rolePrivileges.add("");
+ userPrivilegesList.add(pathPrivilege.toString());
+ }
+ }
+ for (String roleN : user.getRoleList()) {
+ Role role = iAuthorizer.getRole(roleN);
+ if (roleN == null) {
+ continue;
+ }
+ for (PathPrivilege pathPrivilege : role.getPrivilegeList()) {
+ if (authorizerReq.getNodeName().equals("")
+ || AuthUtils.pathBelongsTo(authorizerReq.getNodeName(),
pathPrivilege.getPath())) {
+ rolePrivileges.add(roleN);
+ userPrivilegesList.add(pathPrivilege.toString());
+ }
+ }
+ }
+ permissionInfo.put(IoTDBConstant.COLUMN_ROLE, rolePrivileges);
+ }
+ permissionInfo.put(IoTDBConstant.COLUMN_PRIVILEGE, userPrivilegesList);
+ return permissionInfo;
+ }
+
+ public boolean login(String username, String password) throws AuthException {
+ return iAuthorizer.login(username, password);
+ }
+
+ public boolean checkUserPrivileges(String username, String path, int
permission)
+ throws AuthException {
+ return iAuthorizer.checkUserPrivileges(username, path, permission);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerTask.java
similarity index 73%
rename from
server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java
rename to
server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerTask.java
index 3193464919..375ffe0600 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerConfigTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/AuthorizerTask.java
@@ -26,34 +26,38 @@ import
org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-public class AuthorizerConfigTask implements IConfigTask {
+public class AuthorizerTask implements IConfigTask {
- private static final Logger LOGGER =
LoggerFactory.getLogger(AuthorizerConfigTask.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AuthorizerTask.class);
+
+ private static IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
private AuthorStatement authorStatement;
private AuthorizerManager authorizerManager =
AuthorizerManager.getInstance();
- public AuthorizerConfigTask(AuthorStatement authorStatement) {
+ public AuthorizerTask(AuthorStatement authorStatement) {
this.authorStatement = authorStatement;
}
@Override
public ListenableFuture<ConfigTaskResult> execute(
IClientManager<PartitionRegionId, ConfigNodeClient> clientManager) {
- SettableFuture<ConfigTaskResult> future = null;
- try (ConfigNodeClient configNodeClient =
- clientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
// Construct request using statement
TAuthorizerReq req =
new TAuthorizerReq(
@@ -66,19 +70,29 @@ public class AuthorizerConfigTask implements IConfigTask {
authorStatement.getNodeName() == null
? ""
: authorStatement.getNodeName().getFullPath());
-
// Send request to some API server
- if (authorStatement.getQueryType() == QueryType.WRITE) {
- future = authorizerManager.operatePermission(req, configNodeClient);
+ if (config.isClusterMode()) {
+ try (ConfigNodeClient configNodeClient =
+ clientManager.borrowClient(ConfigNodeInfo.partitionRegionId); ) {
+ if (authorStatement.getQueryType() == QueryType.WRITE) {
+ future = authorizerManager.operatePermission(req,
configNodeClient);
+ } else {
+ future = authorizerManager.queryPermission(req, configNodeClient);
+ }
+ }
} else {
- future = authorizerManager.queryPermission(req, configNodeClient);
+ if (authorStatement.getQueryType() == QueryType.WRITE) {
+ future = authorizerManager.operatePermission(req, null);
+ } else {
+ future = authorizerManager.queryPermission(req, null);
+ }
}
- } catch (AuthException e) {
- LOGGER.error("No such privilege {}.", authorStatement.getAuthorType());
- future.setException(e);
- } catch (IOException e) {
+
+ } catch (IOException | TException e) {
LOGGER.error("can't connect to all config nodes", e);
future.setException(e);
+ } catch (AuthException e) {
+ future.setException(e);
}
// If the action is executed successfully, return the Future.
// If your operation is async, you can return the corresponding future
directly.
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 27311d6a7f..38b3ac1d18 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -96,7 +96,7 @@ public class ConfigTaskVisitor
@Override
public IConfigTask visitAuthor(AuthorStatement statement, TaskContext
context) {
- return new AuthorizerConfigTask(statement);
+ return new AuthorizerTask(statement);
}
@Override
diff --git
a/server/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java
b/server/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java
index 2dfa147efc..16f1305c84 100644
--- a/server/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.auth;
-import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.auth.entity.PathPrivilege;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.auth.entity.Role;
@@ -27,7 +26,6 @@ import org.apache.iotdb.commons.auth.entity.User;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRoleResp;
import org.apache.iotdb.confignode.rpc.thrift.TUserResp;
-import org.apache.iotdb.rpc.ConfigNodeConnectionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
@@ -43,10 +41,10 @@ import java.util.Set;
public class AuthorizerManagerTest {
- AuthorizerManager authorizerManager = AuthorizerManager.getInstance();
+ ClusterAuthorityFetcher authorityFetcher =
ClusterAuthorityFetcher.getInstance();
@Test
- public void permissionCacheTest() throws ConfigNodeConnectionException,
AuthException {
+ public void permissionCacheTest() {
User user = new User();
Role role1 = new Role();
Role role2 = new Role();
@@ -92,8 +90,8 @@ public class AuthorizerManagerTest {
result.setRoleInfo(new HashMap<>());
// User authentication permission without role
- authorizerManager.getUserCache().put(user.getName(),
authorizerManager.cacheUser(result));
- User user1 = authorizerManager.getUserCache().getIfPresent(user.getName());
+ authorityFetcher.getUserCache().put(user.getName(),
authorityFetcher.cacheUser(result));
+ User user1 = authorityFetcher.getUserCache().getIfPresent(user.getName());
assert user1 != null;
Assert.assertEquals(user.getName(), user1.getName());
Assert.assertEquals(user.getPassword(), user1.getPassword());
@@ -102,20 +100,20 @@ public class AuthorizerManagerTest {
// User has permission
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- authorizerManager
- .checkPermissionCache(
+ authorityFetcher
+ .checkUserPrivileges(
"user", Collections.singletonList("root.ln"),
PrivilegeType.CREATE_ROLE.ordinal())
.getCode());
// User does not have permission
Assert.assertEquals(
TSStatusCode.NO_PERMISSION_ERROR.getStatusCode(),
- authorizerManager
- .checkPermissionCache(
+ authorityFetcher
+ .checkUserPrivileges(
"user", Collections.singletonList("root.ln"),
PrivilegeType.CREATE_USER.ordinal())
.getCode());
// Authenticate users with roles
- authorizerManager.invalidateCache(user.getName(), "");
+ authorityFetcher.invalidateCache(user.getName(), "");
tUserResp.setPrivilegeList(new ArrayList<>());
tUserResp.setRoleList(user.getRoleList());
@@ -133,30 +131,30 @@ public class AuthorizerManagerTest {
tRoleRespMap.put(role.getName(), tRoleResp);
}
result.setRoleInfo(tRoleRespMap);
- authorizerManager.getUserCache().put(user.getName(),
authorizerManager.cacheUser(result));
- Role role3 =
authorizerManager.getRoleCache().getIfPresent(role1.getName());
+ authorityFetcher.getUserCache().put(user.getName(),
authorityFetcher.cacheUser(result));
+ Role role3 = authorityFetcher.getRoleCache().getIfPresent(role1.getName());
Assert.assertEquals(role1.getName(), role3.getName());
Assert.assertEquals(role1.getPrivilegeList(), role3.getPrivilegeList());
// role has permission
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- authorizerManager
- .checkPermissionCache(
+ authorityFetcher
+ .checkUserPrivileges(
"user", Collections.singletonList("root.ln"),
PrivilegeType.CREATE_ROLE.ordinal())
.getCode());
// role does not have permission
Assert.assertEquals(
TSStatusCode.NO_PERMISSION_ERROR.getStatusCode(),
- authorizerManager
- .checkPermissionCache(
+ authorityFetcher
+ .checkUserPrivileges(
"user", Collections.singletonList("root.ln"),
PrivilegeType.CREATE_USER.ordinal())
.getCode());
- authorizerManager.invalidateCache(user.getName(), "");
+ authorityFetcher.invalidateCache(user.getName(), "");
- user1 = authorizerManager.getUserCache().getIfPresent(user.getName());
- role1 = authorizerManager.getRoleCache().getIfPresent(role1.getName());
+ user1 = authorityFetcher.getUserCache().getIfPresent(user.getName());
+ role1 = authorityFetcher.getRoleCache().getIfPresent(role1.getName());
Assert.assertNull(user1);
Assert.assertNull(role1);
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
index 46ceab5ef3..a3dfc999e6 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
@@ -124,6 +124,14 @@ public class StandaloneCoordinatorTest {
executeStatement(insertStmt);
}
+ @Test
+ public void createUser() {
+ String createUserSql = "create user username 'password'";
+ Statement createStmt =
+ StatementGenerator.createStatement(createUserSql,
ZoneId.systemDefault());
+ executeStatement(createStmt);
+ }
+
private void executeStatement(Statement statement) {
long queryId = SessionManager.getInstance().requestQueryId(false);
ExecutionResult executionResult =