This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch refactor_author_query in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 942f630f6ff4eac91cf1d0ab948969c63ffec258 Author: 江天 <[email protected]> AuthorDate: Thu May 23 13:07:29 2019 +0800 use a more elegant way to return results of Authorization queries like 'LIST USERS' --- .../org/apache/iotdb/db/conf/IoTDBConstant.java | 4 + .../db/qp/executor/IQueryProcessExecutor.java | 3 +- .../iotdb/db/qp/executor/OverflowQPExecutor.java | 263 +++++++++++++-------- .../iotdb/db/qp/executor/QueryProcessExecutor.java | 17 +- .../apache/iotdb/db/qp/physical/PhysicalPlan.java | 4 + .../iotdb/db/qp/physical/sys/AuthorPlan.java | 6 + .../apache/iotdb/db/query/dataset/AuthDataSet.java | 53 +++++ .../org/apache/iotdb/db/service/TSServiceImpl.java | 197 +++++++++------ .../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 6 + 9 files changed, 383 insertions(+), 170 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java index 48e927f..e2775cd 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java @@ -59,4 +59,8 @@ public class IoTDBConstant { // for cluster, set read consistency level public static final String SET_READ_CONSISTENCY_LEVEL_PATTERN = "set\\s+read.*level.*"; + + public static final String ROLE = "Role"; + public static final String USER = "User"; + public static final String PRIVILEGE = "Privilege"; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java index 920aeef..6377f44 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java @@ -25,7 +25,6 @@ import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.fill.IFill; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; @@ -51,7 +50,7 @@ public interface IQueryProcessExecutor { * @param queryPlan QueryPlan * @return QueryDataSet */ - QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context) + QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context) throws IOException, FileNodeManagerException, PathErrorException, QueryFilterOptimizationException, ProcessorException; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java index 9ca337e..e772bf5 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java @@ -18,6 +18,10 @@ */ package org.apache.iotdb.db.qp.executor; +import static org.apache.iotdb.db.conf.IoTDBConstant.PRIVILEGE; +import static org.apache.iotdb.db.conf.IoTDBConstant.ROLE; +import static org.apache.iotdb.db.conf.IoTDBConstant.USER; + import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -41,6 +45,7 @@ import org.apache.iotdb.db.metadata.MNode; import org.apache.iotdb.db.monitor.MonitorConstants; import org.apache.iotdb.db.qp.constant.SQLConstant; import org.apache.iotdb.db.qp.logical.sys.AuthorOperator; +import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType; import org.apache.iotdb.db.qp.logical.sys.MetadataOperator; import org.apache.iotdb.db.qp.logical.sys.PropertyOperator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; @@ -52,6 +57,7 @@ import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan; import org.apache.iotdb.db.qp.physical.sys.MetadataPlan; import org.apache.iotdb.db.qp.physical.sys.PropertyPlan; import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.dataset.AuthDataSet; import org.apache.iotdb.db.query.executor.EngineQueryRouter; import org.apache.iotdb.db.query.fill.IFill; import org.apache.iotdb.db.utils.AuthUtils; @@ -60,9 +66,12 @@ import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.Field; import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; @@ -132,14 +141,16 @@ public class OverflowQPExecutor extends QueryProcessExecutor { case GRANT_USER_ROLE: case MODIFY_PASSWORD: case DELETE_USER: + AuthorPlan author = (AuthorPlan) plan; + return operateAuthor(author); case LIST_ROLE: case LIST_USER: case LIST_ROLE_PRIVILEGE: case LIST_ROLE_USERS: case LIST_USER_PRIVILEGE: case LIST_USER_ROLES: - AuthorPlan author = (AuthorPlan) plan; - return operateAuthor(author); + throw new ProcessorException(String.format("Author query %s is now allowed" + + " in processNonQuery", plan.getOperatorType())); case LOADDATA: LoadDataPlan loadData = (LoadDataPlan) plan; LoadDataUtils load = new LoadDataUtils(); @@ -312,15 +323,12 @@ public class OverflowQPExecutor extends QueryProcessExecutor { String newPassword = author.getNewPassword(); Set<Integer> permissions = author.getPermissions(); Path nodeName = author.getNodeName(); - IAuthorizer authorizer = null; + IAuthorizer authorizer; try { authorizer = LocalFileAuthorizer.getInstance(); } catch (AuthException e) { throw new ProcessorException(e); } - StringBuilder msg; - List<String> roleList; - List<String> userList; try { switch (authorType) { case UPDATE_USER: @@ -394,94 +402,6 @@ public class OverflowQPExecutor extends QueryProcessExecutor { throw new ProcessorException("User " + userName + " does not have role " + roleName); } return true; - case LIST_ROLE: - roleList = authorizer.listAllRoles(); - msg = new StringBuilder("Roles are : [ \n"); - for (String role : roleList) { - msg.append(role).append("\n"); - } - msg.append("]"); - // TODO : use a more elegant way to pass message. - throw new ProcessorException(msg.toString()); - case LIST_USER: - userList = authorizer.listAllUsers(); - msg = new StringBuilder("Users are : [ \n"); - for (String user : userList) { - msg.append(user).append("\n"); - } - msg.append("]"); - throw new ProcessorException(msg.toString()); - case LIST_ROLE_USERS: - Role role = authorizer.getRole(roleName); - if (role == null) { - throw new ProcessorException("No such role : " + roleName); - } - userList = authorizer.listAllUsers(); - msg = new StringBuilder("Users are : [ \n"); - for (String userN : userList) { - User userObj = authorizer.getUser(userN); - if (userObj != null && userObj.hasRole(roleName)) { - msg.append(userN).append("\n"); - } - } - msg.append("]"); - throw new ProcessorException(msg.toString()); - case LIST_USER_ROLES: - msg = new StringBuilder("Roles are : [ \n"); - User user = authorizer.getUser(userName); - if (user != null) { - for (String roleN : user.getRoleList()) { - msg.append(roleN).append("\n"); - } - } else { - throw new ProcessorException("No such user : " + userName); - } - msg.append("]"); - throw new ProcessorException(msg.toString()); - case LIST_ROLE_PRIVILEGE: - msg = new StringBuilder("Privileges are : [ \n"); - role = authorizer.getRole(roleName); - if (role != null) { - for (PathPrivilege pathPrivilege : role.getPrivilegeList()) { - if (nodeName == null || AuthUtils - .pathBelongsTo(nodeName.getFullPath(), pathPrivilege.getPath())) { - msg.append(pathPrivilege.toString()); - } - } - } else { - throw new ProcessorException("No such role : " + roleName); - } - msg.append("]"); - throw new ProcessorException(msg.toString()); - case LIST_USER_PRIVILEGE: - user = authorizer.getUser(userName); - if (user == null) { - throw new ProcessorException("No such user : " + userName); - } - msg = new StringBuilder("Privileges are : [ \n"); - msg.append("From itself : {\n"); - for (PathPrivilege pathPrivilege : user.getPrivilegeList()) { - if (nodeName == null || AuthUtils - .pathBelongsTo(nodeName.getFullPath(), pathPrivilege.getPath())) { - msg.append(pathPrivilege.toString()); - } - } - msg.append("}\n"); - for (String roleN : user.getRoleList()) { - role = authorizer.getRole(roleN); - if (role != null) { - msg.append("From role ").append(roleN).append(" : {\n"); - for (PathPrivilege pathPrivilege : role.getPrivilegeList()) { - if (nodeName == null - || AuthUtils.pathBelongsTo(nodeName.getFullPath(), pathPrivilege.getPath())) { - msg.append(pathPrivilege.toString()); - } - } - msg.append("}\n"); - } - } - msg.append("]"); - throw new ProcessorException(msg.toString()); default: throw new ProcessorException("Unsupported operation " + authorType); } @@ -679,4 +599,159 @@ public class OverflowQPExecutor extends QueryProcessExecutor { } return true; } + + @Override + protected QueryDataSet processAuthorQuery(AuthorPlan plan, QueryContext context) + throws ProcessorException { + AuthorType authorType = plan.getAuthorType(); + String userName = plan.getUserName(); + String roleName = plan.getRoleName(); + Path nodeName = plan.getNodeName(); + IAuthorizer authorizer; + try { + authorizer = LocalFileAuthorizer.getInstance(); + } catch (AuthException e) { + throw new ProcessorException(e); + } + List<Path> headerList = new ArrayList<>(); + List<TSDataType> typeList = new ArrayList<>(); + List<String> roleList; + List<String> userList; + AuthDataSet dataSet; + int index = 0; + try { + switch (authorType) { + case LIST_ROLE: + headerList.add(new Path(ROLE)); + typeList.add(TSDataType.TEXT); + dataSet = new AuthDataSet(headerList, typeList); + roleList = authorizer.listAllRoles(); + for (String role : roleList) { + RowRecord record = new RowRecord(index++); + Field field = new Field(TSDataType.TEXT); + field.setBinaryV(new Binary(role)); + record.addField(field); + dataSet.putRecord(record); + } + break; + case LIST_USER: + userList = authorizer.listAllUsers(); + headerList.add(new Path(USER)); + typeList.add(TSDataType.TEXT); + dataSet = new AuthDataSet(headerList, typeList); + for (String user : userList) { + RowRecord record = new RowRecord(index++); + Field field = new Field(TSDataType.TEXT); + field.setBinaryV(new Binary(user)); + record.addField(field); + dataSet.putRecord(record); + } + break; + case LIST_ROLE_USERS: + Role role = authorizer.getRole(roleName); + if (role == null) { + throw new ProcessorException("No such role : " + roleName); + } + headerList.add(new Path(USER)); + typeList.add(TSDataType.TEXT); + dataSet = new AuthDataSet(headerList, typeList); + userList = authorizer.listAllUsers(); + for (String userN : userList) { + User userObj = authorizer.getUser(userN); + if (userObj != null && userObj.hasRole(roleName)) { + RowRecord record = new RowRecord(index++); + Field field = new Field(TSDataType.TEXT); + field.setBinaryV(new Binary(userN)); + record.addField(field); + dataSet.putRecord(record); + } + } + break; + case LIST_USER_ROLES: + User user = authorizer.getUser(userName); + if (user != null) { + headerList.add(new Path(ROLE)); + typeList.add(TSDataType.TEXT); + dataSet = new AuthDataSet(headerList, typeList); + for (String roleN : user.getRoleList()) { + RowRecord record = new RowRecord(index++); + Field field = new Field(TSDataType.TEXT); + field.setBinaryV(new Binary(roleN)); + record.addField(field); + dataSet.putRecord(record); + } + } else { + throw new ProcessorException("No such user : " + userName); + } + break; + case LIST_ROLE_PRIVILEGE: + role = authorizer.getRole(roleName); + if (role != null) { + headerList.add(new Path(PRIVILEGE)); + typeList.add(TSDataType.TEXT); + dataSet = new AuthDataSet(headerList, typeList); + for (PathPrivilege pathPrivilege : role.getPrivilegeList()) { + if (nodeName == null || AuthUtils + .pathBelongsTo(nodeName.getFullPath(), pathPrivilege.getPath())) { + RowRecord record = new RowRecord(index++); + Field field = new Field(TSDataType.TEXT); + field.setBinaryV(new Binary(pathPrivilege.toString())); + record.addField(field); + dataSet.putRecord(record); + } + } + } else { + throw new ProcessorException("No such role : " + roleName); + } + break; + case LIST_USER_PRIVILEGE: + user = authorizer.getUser(userName); + if (user == null) { + throw new ProcessorException("No such user : " + userName); + } + headerList.add(new Path(ROLE)); + headerList.add(new Path(PRIVILEGE)); + typeList.add(TSDataType.TEXT); + typeList.add(TSDataType.TEXT); + dataSet = new AuthDataSet(headerList, typeList); + for (PathPrivilege pathPrivilege : user.getPrivilegeList()) { + if (nodeName == null || AuthUtils + .pathBelongsTo(nodeName.getFullPath(), pathPrivilege.getPath())) { + RowRecord record = new RowRecord(index++); + Field roleF = new Field(TSDataType.TEXT); + roleF.setBinaryV(new Binary("")); + record.addField(roleF); + Field privilegeF = new Field(TSDataType.TEXT); + privilegeF.setBinaryV(new Binary(pathPrivilege.toString())); + record.addField(privilegeF); + dataSet.putRecord(record); + } + } + for (String roleN : user.getRoleList()) { + role = authorizer.getRole(roleN); + if (role != null) { + for (PathPrivilege pathPrivilege : role.getPrivilegeList()) { + if (nodeName == null + || AuthUtils.pathBelongsTo(nodeName.getFullPath(), pathPrivilege.getPath())) { + RowRecord record = new RowRecord(index++); + Field roleF = new Field(TSDataType.TEXT); + roleF.setBinaryV(new Binary(roleN)); + record.addField(roleF); + Field privilegeF = new Field(TSDataType.TEXT); + privilegeF.setBinaryV(new Binary(pathPrivilege.toString())); + record.addField(privilegeF); + dataSet.putRecord(record); + } + } + } + } + break; + default: + throw new ProcessorException("Unsupported operation " + authorType); + } + } catch (AuthException e) { + throw new ProcessorException(e.getMessage()); + } + return dataSet; + } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java index 99476f2..b43e73a 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; +import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.executor.EngineQueryRouter; import org.apache.iotdb.db.query.executor.IEngineQueryRouter; @@ -46,10 +47,24 @@ public abstract class QueryProcessExecutor implements IQueryProcessExecutor { protected IEngineQueryRouter queryRouter = new EngineQueryRouter(); @Override - public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context) + public QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context) throws IOException, FileNodeManagerException, PathErrorException, QueryFilterOptimizationException, ProcessorException { + if (queryPlan instanceof QueryPlan) { + return processDataQuery((QueryPlan) queryPlan, context); + } else if (queryPlan instanceof AuthorPlan) { + return processAuthorQuery((AuthorPlan) queryPlan, context); + } else { + throw new ProcessorException(String.format("Unrecognized query plan %s", queryPlan)); + } + } + + protected abstract QueryDataSet processAuthorQuery(AuthorPlan plan, QueryContext context) + throws ProcessorException; + + private QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext context) + throws FileNodeManagerException, QueryFilterOptimizationException, PathErrorException, ProcessorException, IOException { QueryExpression queryExpression = QueryExpression.create().setSelectSeries(queryPlan.getPaths()) .setExpression(queryPlan.getExpression()); if (queryPlan instanceof GroupByPlan) { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java index d2f7bb9..07d1e75 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java @@ -75,4 +75,8 @@ public abstract class PhysicalPlan implements Serializable { public void setProposer(String proposer) { this.proposer = proposer; } + + public void setQuery(boolean query) { + isQuery = query; + } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java index 3ec812c..339f1e0 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java @@ -99,21 +99,27 @@ public class AuthorPlan extends PhysicalPlan { this.setOperatorType(Operator.OperatorType.REVOKE_USER_ROLE); break; case LIST_USER_PRIVILEGE: + this.setQuery(true); this.setOperatorType(Operator.OperatorType.LIST_USER_PRIVILEGE); break; case LIST_ROLE_PRIVILEGE: + this.setQuery(true); this.setOperatorType(Operator.OperatorType.LIST_ROLE_PRIVILEGE); break; case LIST_USER_ROLES: + this.setQuery(true); this.setOperatorType(Operator.OperatorType.LIST_USER_ROLES); break; case LIST_ROLE_USERS: + this.setQuery(true); this.setOperatorType(Operator.OperatorType.LIST_ROLE_USERS); break; case LIST_USER: + this.setQuery(true); this.setOperatorType(Operator.OperatorType.LIST_USER); break; case LIST_ROLE: + this.setQuery(true); this.setOperatorType(Operator.OperatorType.LIST_ROLE); break; default: diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AuthDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AuthDataSet.java new file mode 100644 index 0000000..1846f12 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AuthDataSet.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.dataset; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; + +public class AuthDataSet extends QueryDataSet { + + List<RowRecord> records = new ArrayList<>(); + int index = 0; + + public AuthDataSet(List<Path> paths, + List<TSDataType> dataTypes) { + super(paths, dataTypes); + } + + @Override + public boolean hasNext() throws IOException { + return index < records.size(); + } + + @Override + public RowRecord next() throws IOException { + return records.get(index++); + } + + public void putRecord(RowRecord newRecord) { + records.add(newRecord); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index b02f1c2..d8d2ad5 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -18,6 +18,10 @@ */ package org.apache.iotdb.db.service; +import static org.apache.iotdb.db.conf.IoTDBConstant.PRIVILEGE; +import static org.apache.iotdb.db.conf.IoTDBConstant.ROLE; +import static org.apache.iotdb.db.conf.IoTDBConstant.USER; + import java.io.IOException; import java.nio.ByteBuffer; import java.sql.Statement; @@ -562,80 +566,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { PhysicalPlan plan = processor.parseSQLToPhysicalPlan(statement, zoneIds.get()); plan.setProposer(username.get()); - List<Path> paths; - paths = plan.getPaths(); - - // check seriesPath exists - if (paths.isEmpty()) { - return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, "Timeseries does not exist."); - } - - // check file level set - - try { - checkFileLevelSet(paths); - } catch (PathErrorException e) { - LOGGER.error("meet error while checking file level.", e); - return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage()); - } - - // check permissions - if (!checkAuthorization(paths, plan)) { - return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, - "No permissions for this query."); - } - - TSExecuteStatementResp resp = getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS, ""); + TSExecuteStatementResp resp; List<String> columns = new ArrayList<>(); - // Restore column header of aggregate to func(column_name), only - // support single aggregate function for now - if (plan instanceof QueryPlan) { - switch (plan.getOperatorType()) { - case QUERY: - case FILL: - for (Path p : paths) { - columns.add(p.getFullPath()); - } - break; - case AGGREGATION: - case GROUPBY: - List<String> aggregations = plan.getAggregations(); - if (aggregations.size() != paths.size()) { - for (int i = 1; i < paths.size(); i++) { - aggregations.add(aggregations.get(0)); - } - } - for (int i = 0; i < paths.size(); i++) { - columns.add(aggregations.get(i) + "(" + paths.get(i).getFullPath() + ")"); - } - break; - default: - throw new TException("unsupported query type: " + plan.getOperatorType()); - } + if (!(plan instanceof AuthorPlan)) { + resp = executeDataQuery(plan, columns); } else { - Operator.OperatorType type = plan.getOperatorType(); - switch (type) { - case QUERY: - case FILL: - for (Path p : paths) { - columns.add(p.getFullPath()); - } - break; - case AGGREGATION: - case GROUPBY: - List<String> aggregations = plan.getAggregations(); - if (aggregations.size() != paths.size()) { - for (int i = 1; i < paths.size(); i++) { - aggregations.add(aggregations.get(0)); - } - } - for (int i = 0; i < paths.size(); i++) { - columns.add(aggregations.get(i) + "(" + paths.get(i).getFullPath() + ")"); - } - break; - default: - throw new TException("not support " + type + " in new read process"); - } + resp = executeAuthQuery(plan, columns); } resp.setOperationType(plan.getOperatorType().toString()); @@ -654,6 +590,118 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } } + private TSExecuteStatementResp executeAuthQuery(PhysicalPlan plan, List<String> columns) { + TSExecuteStatementResp resp = getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS, ""); + AuthorPlan authorPlan = (AuthorPlan) plan; + switch (authorPlan.getAuthorType()) { + case LIST_ROLE: + columns.add(ROLE); + break; + case LIST_USER: + columns.add(USER); + break; + case LIST_ROLE_USERS: + columns.add(ROLE); + columns.add(USER); + break; + case LIST_USER_ROLES: + columns.add(USER); + columns.add(ROLE); + break; + case LIST_ROLE_PRIVILEGE: + columns.add(ROLE); + columns.add(PRIVILEGE); + break; + case LIST_USER_PRIVILEGE: + columns.add(USER); + columns.add(PRIVILEGE); + break; + default: + return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, String.format("%s is not an " + + "auth query", authorPlan.getAuthorType())); + } + return resp; + } + + private TSExecuteStatementResp executeDataQuery(PhysicalPlan plan, List<String> columns) + throws AuthException, TException { + List<Path> paths; + paths = plan.getPaths(); + + // check seriesPath exists + if (paths.isEmpty()) { + return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, "Timeseries does not exist."); + } + + // check file level set + + try { + checkFileLevelSet(paths); + } catch (PathErrorException e) { + LOGGER.error("meet error while checking file level.", e); + return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage()); + } + + // check permissions + if (!checkAuthorization(paths, plan)) { + return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, + "No permissions for this query."); + } + + TSExecuteStatementResp resp = getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS, ""); + // Restore column header of aggregate to func(column_name), only + // support single aggregate function for now + if (plan instanceof QueryPlan) { + switch (plan.getOperatorType()) { + case QUERY: + case FILL: + for (Path p : paths) { + columns.add(p.getFullPath()); + } + break; + case AGGREGATION: + case GROUPBY: + List<String> aggregations = plan.getAggregations(); + if (aggregations.size() != paths.size()) { + for (int i = 1; i < paths.size(); i++) { + aggregations.add(aggregations.get(0)); + } + } + for (int i = 0; i < paths.size(); i++) { + columns.add(aggregations.get(i) + "(" + paths.get(i).getFullPath() + ")"); + } + break; + default: + throw new TException("unsupported query type: " + plan.getOperatorType()); + } + } else { + Operator.OperatorType type = plan.getOperatorType(); + switch (type) { + case QUERY: + case FILL: + for (Path p : paths) { + columns.add(p.getFullPath()); + } + break; + case AGGREGATION: + case GROUPBY: + List<String> aggregations = plan.getAggregations(); + if (aggregations.size() != paths.size()) { + for (int i = 1; i < paths.size(); i++) { + aggregations.add(aggregations.get(0)); + } + } + for (int i = 0; i < paths.size(); i++) { + columns.add(aggregations.get(i) + "(" + paths.get(i).getFullPath() + ")"); + } + break; + default: + throw new TException("not support " + type + " in new read process"); + } + } + return resp; + } + protected void checkFileLevelSet(List<Path> paths) throws PathErrorException { MManager.getInstance().checkFileLevel(paths); } @@ -699,13 +747,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { PhysicalPlan physicalPlan = queryStatus.get().get(statement); processor.getExecutor().setFetchSize(fetchSize); + QueryDataSet queryDataSet; QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignJobId()); initContextMap(); contextMapLocal.get().put(req.queryId, context); - QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan, + queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan, context); + queryRet.get().put(statement, queryDataSet); return queryDataSet; } @@ -897,5 +947,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME); return properties; } + } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java index 71df644..7afced3 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; +import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.executor.EngineQueryRouter; import org.apache.iotdb.db.query.fill.IFill; @@ -206,6 +207,11 @@ public class MemIntQpExecutor extends QueryProcessExecutor { return 0; } + @Override + protected QueryDataSet processAuthorQuery(AuthorPlan plan, QueryContext context) { + return null; + } + private class TestSeries { public TreeMap<Long, Integer> data = new TreeMap<>();
