This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch controlThreadNum in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bcaecba3b0b521a60aed30e0da8b03922b9baaf2 Author: Alima777 <[email protected]> AuthorDate: Wed Dec 29 16:17:01 2021 +0800 seperate showplan, queryplan and authorplan --- .../iotdb/db/qp/physical/crud/AggregationPlan.java | 2 +- .../iotdb/db/qp/physical/crud/LastQueryPlan.java | 8 +- .../iotdb/db/qp/physical/crud/QueryIndexPlan.java | 2 +- .../iotdb/db/qp/physical/crud/QueryPlan.java | 3 +- .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 2 +- .../db/service/thrift/impl/TSServiceImpl.java | 246 +++++++++++---------- 6 files changed, 144 insertions(+), 119 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java index 6083605..c81e12e 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java @@ -81,7 +81,7 @@ public class AggregationPlan extends RawDataQueryPlan { @Override public List<TSDataType> getWideQueryHeaders( - List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList) + List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList) throws MetadataException { List<TSDataType> seriesTypes = new ArrayList<>(); List<String> aggregations = getAggregations(); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java index b2028bf..9ddcaae 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java @@ -35,7 +35,11 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.thrift.TException; -import java.util.*; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; public class LastQueryPlan extends RawDataQueryPlan { @@ -66,7 +70,7 @@ public class LastQueryPlan extends RawDataQueryPlan { @Override public List<TSDataType> getWideQueryHeaders( - List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList) + List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList) throws TException { throw new TException("unsupported query type: " + getOperatorType()); } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java index 8ff816f..9d37dc0 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryIndexPlan.java @@ -41,7 +41,7 @@ public class QueryIndexPlan extends RawDataQueryPlan { @Override public List<TSDataType> getWideQueryHeaders( - List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList) + List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList) throws TException { throw new TException("unsupported query type: " + getOperatorType()); } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java index 6427137..1efa3b9 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java @@ -71,6 +71,7 @@ public abstract class QueryPlan extends PhysicalPlan { public abstract void deduplicate(PhysicalGenerator physicalGenerator) throws MetadataException; + /** Construct TSExecuteStatementResp and the header of result set. */ public TSExecuteStatementResp getTSExecuteStatementResp(boolean isJdbcQuery) throws TException, MetadataException { List<String> respColumns = new ArrayList<>(); @@ -97,7 +98,7 @@ public abstract class QueryPlan extends PhysicalPlan { } public List<TSDataType> getWideQueryHeaders( - List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList) + List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList) throws TException, MetadataException { List<TSDataType> seriesTypes = new ArrayList<>(); for (int i = 0; i < resultColumns.size(); ++i) { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java index 37c46e3..401512a 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java @@ -91,7 +91,7 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan { @Override public List<TSDataType> getWideQueryHeaders( - List<String> respColumns, List<String> respSgColumns, Boolean isJdbcQuery, BitSet aliasList) { + List<String> respColumns, List<String> respSgColumns, boolean isJdbcQuery, BitSet aliasList) { List<TSDataType> seriesTypes = new ArrayList<>(); for (int i = 0; i < paths.size(); i++) { respColumns.add(resultColumns.get(i).getResultColumnName()); diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java index 1762ad6..50395ce 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java @@ -37,7 +37,6 @@ import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.metadata.template.TemplateQueryType; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; @@ -48,7 +47,6 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan; import org.apache.iotdb.db.qp.physical.crud.UDFPlan; import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan; -import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan; @@ -58,12 +56,10 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan; import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan; -import org.apache.iotdb.db.qp.physical.sys.ShowPlan; import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan; import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.tracing.TracingConstant; -import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet; import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet; import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet; import org.apache.iotdb.db.service.IoTDB; @@ -591,8 +587,9 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If } /** - * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, UDFPlan, - * some AuthorPlan + * Execute query statement, return TSExecuteStatementResp with dataset. + * + * @param plan must be a plan for Query: QueryPlan, ShowPlan, and some AuthorPlan */ @SuppressWarnings({"squid:S3776", "squid:S1141"}) // Suppress high Cognitive Complexity warning private TSExecuteStatementResp internalExecuteQueryStatement( @@ -607,6 +604,16 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If throws QueryProcessException, SQLException, StorageEngineException, QueryFilterOptimizationException, MetadataException, IOException, InterruptedException, TException, AuthException { + String username = sessionManager.getUsername(sessionId); + plan.setLoginUserName(username); + // check permissions + if (!checkAuthorization(plan.getAuthPaths(), plan, username)) { + return RpcUtils.getTSExecuteStatementResp( + RpcUtils.getStatus( + TSStatusCode.NO_PERMISSION_ERROR, + "No permissions for this operation " + plan.getOperatorType())); + } + queryFrequencyRecorder.incrementAndGet(); AUDIT_LOGGER.debug( "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement); @@ -616,102 +623,17 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If QueryContext context = genQueryContext(queryId, plan.isDebug(), queryStartTime, statement, timeout); - if (plan instanceof QueryPlan && ((QueryPlan) plan).isEnableTracing()) { - context.setEnableTracing(true); - tracingManager.setStartTime(queryId, this.startTime); - tracingManager.registerActivity( - queryId, - String.format(TracingConstant.ACTIVITY_START_EXECUTE, statement), - this.startTime); - tracingManager.registerActivity(queryId, TracingConstant.ACTIVITY_PARSE_SQL, queryStartTime); - if (!(plan instanceof AlignByDevicePlan)) { - tracingManager.setSeriesPathNum(queryId, plan.getPaths().size()); - } - } - + TSExecuteStatementResp resp; try { - String username = sessionManager.getUsername(sessionId); - plan.setLoginUserName(username); - - TSExecuteStatementResp resp = null; - // execute it before createDataSet since it may change the content of query plan - if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) { - resp = getQueryColumnHeaders(plan, username, isJdbcQuery); - } if (plan instanceof QueryPlan) { - ((QueryPlan) plan).setEnableRedirect(enableRedirect); - } - // create and cache dataset - QueryDataSet newDataSet = createQueryDataSet(context, plan, fetchSize); - if (plan instanceof QueryPlan && ((QueryPlan) plan).isEnableTracing()) { - tracingManager.registerActivity( - queryId, TracingConstant.ACTIVITY_CREATE_DATASET, System.currentTimeMillis()); - } - - if (newDataSet.getEndPoint() != null && enableRedirect) { - // redirect query - LOGGER.debug( - "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint()); - TSStatus status = new TSStatus(); - status.setRedirectNode( - new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort())); - status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode()); - resp.setStatus(status); - resp.setQueryId(queryId); - return resp; - } - - if (plan instanceof ShowPlan || plan instanceof AuthorPlan) { - resp = getListDataSetHeaders(plan, newDataSet); - } else if (plan instanceof UDFPlan - || (plan instanceof QueryPlan && ((QueryPlan) plan).isGroupByLevel())) { - resp = getQueryColumnHeaders(plan, username, isJdbcQuery); - } - - resp.setOperationType(plan.getOperatorType().toString()); - if (newDataSet instanceof DirectNonAlignDataSet) { - resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username)); + QueryPlan queryPlan = (QueryPlan) plan; + queryPlan.setEnableRedirect(enableRedirect); + resp = executeQueryPlan(queryPlan, context, isJdbcQuery, fetchSize, username); } else { - try { - TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username); - resp.setQueryDataSet(tsQueryDataSet); - } catch (RedirectException e) { - LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint()); - if (enableRedirect) { - // redirect query - TSStatus status = new TSStatus(); - status.setRedirectNode(e.getEndPoint()); - status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode()); - resp.setStatus(status); - resp.setQueryId(queryId); - return resp; - } else { - LOGGER.error( - "execute {} error, if session does not support redirect," - + " should not throw redirection exception.", - statement, - e); - } - } + resp = executeShowOrAuthorPlan(plan, context, fetchSize, username); } - resp.setQueryId(queryId); - - if (plan instanceof AlignByDevicePlan && ((QueryPlan) plan).isEnableTracing()) { - tracingManager.setSeriesPathNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum()); - } - - queryTimeManager.unRegisterQuery(queryId, false); - - if (plan instanceof QueryPlan && ((QueryPlan) plan).isEnableTracing()) { - tracingManager.registerActivity( - queryId, TracingConstant.ACTIVITY_REQUEST_COMPLETE, System.currentTimeMillis()); - - TSTracingInfo tsTracingInfo = fillRpcReturnTracingInfo(queryId); - resp.setTracingInfo(tsTracingInfo); - } - - return resp; + resp.setOperationType(plan.getOperatorType().toString()); } catch (Exception e) { sessionManager.releaseQueryResourceNoExceptions(queryId); throw e; @@ -722,9 +644,122 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement); } } + return resp; } - private TSExecuteStatementResp getListDataSetHeaders(PhysicalPlan plan, QueryDataSet dataSet) { + private TSExecuteStatementResp executeQueryPlan( + QueryPlan plan, QueryContext context, boolean isJdbcQuery, int fetchSize, String username) + throws TException, MetadataException, QueryProcessException, StorageEngineException, + SQLException, IOException, InterruptedException, QueryFilterOptimizationException, + AuthException { + if (plan.isEnableTracing()) { + context.setEnableTracing(true); + tracingManager.setStartTime(context.getQueryId(), this.startTime); + tracingManager.registerActivity( + context.getQueryId(), + String.format(TracingConstant.ACTIVITY_START_EXECUTE, context.getStatement()), + this.startTime); + tracingManager.registerActivity( + context.getQueryId(), TracingConstant.ACTIVITY_PARSE_SQL, context.getStartTime()); + } + + TSExecuteStatementResp resp = null; + // execute it before createDataSet since it may change the content of query plan + if (!(plan instanceof UDFPlan)) { + resp = plan.getTSExecuteStatementResp(isJdbcQuery); + } + // create and cache dataset + QueryDataSet newDataSet = createQueryDataSet(context, plan, fetchSize); + + if (plan.isEnableTracing()) { + tracingManager.registerActivity( + context.getQueryId(), + TracingConstant.ACTIVITY_CREATE_DATASET, + System.currentTimeMillis()); + } + + if (newDataSet.getEndPoint() != null && plan.isEnableRedirect()) { + // redirect query + LOGGER.debug( + "need to redirect {} {} to node {}", + context.getStatement(), + context.getQueryId(), + newDataSet.getEndPoint()); + TSStatus status = new TSStatus(); + status.setRedirectNode( + new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort())); + status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode()); + resp.setStatus(status); + resp.setQueryId(context.getQueryId()); + return resp; + } + + if (plan instanceof UDFPlan || plan.isGroupByLevel()) { + resp = plan.getTSExecuteStatementResp(isJdbcQuery); + } + + if (newDataSet instanceof DirectNonAlignDataSet) { + resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username)); + } else { + try { + TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username); + resp.setQueryDataSet(tsQueryDataSet); + } catch (RedirectException e) { + LOGGER.debug( + "need to redirect {} {} to {}", + context.getStatement(), + context.getQueryId(), + e.getEndPoint()); + if (plan.isEnableRedirect()) { + // redirect query + TSStatus status = new TSStatus(); + status.setRedirectNode(e.getEndPoint()); + status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode()); + resp.setStatus(status); + resp.setQueryId(context.getQueryId()); + return resp; + } else { + LOGGER.error( + "execute {} error, if session does not support redirect," + + " should not throw redirection exception.", + context.getStatement(), + e); + } + } + } + queryTimeManager.unRegisterQuery(context.getQueryId(), false); + + if (plan.isEnableTracing()) { + tracingManager.registerActivity( + context.getQueryId(), + TracingConstant.ACTIVITY_REQUEST_COMPLETE, + System.currentTimeMillis()); + + TSTracingInfo tsTracingInfo = fillRpcReturnTracingInfo(context.getQueryId()); + resp.setTracingInfo(tsTracingInfo); + } + return resp; + } + + private TSExecuteStatementResp executeShowOrAuthorPlan( + PhysicalPlan plan, QueryContext context, int fetchSize, String username) + throws QueryProcessException, TException, StorageEngineException, SQLException, IOException, + InterruptedException, QueryFilterOptimizationException, MetadataException, AuthException { + // create and cache dataset + QueryDataSet newDataSet = createQueryDataSet(context, plan, fetchSize); + TSExecuteStatementResp resp = getListDataSetResp(plan, newDataSet); + + resp.setQueryDataSet(fillRpcReturnData(fetchSize, newDataSet, username)); + queryTimeManager.unRegisterQuery(context.getQueryId(), false); + return resp; + } + + /** + * Construct TSExecuteStatementResp and the header of list result set. + * + * @param plan maybe ShowPlan or AuthorPlan + */ + private TSExecuteStatementResp getListDataSetResp(PhysicalPlan plan, QueryDataSet dataSet) { TSExecuteStatementResp resp = StaticResps.getNoTimeExecuteResp( dataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()), @@ -735,21 +770,6 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If return resp; } - /** get ResultSet schema */ - private TSExecuteStatementResp getQueryColumnHeaders( - PhysicalPlan physicalPlan, String username, boolean isJdbcQuery) - throws AuthException, TException, MetadataException { - // check permissions - if (!checkAuthorization(physicalPlan.getAuthPaths(), physicalPlan, username)) { - return RpcUtils.getTSExecuteStatementResp( - RpcUtils.getStatus( - TSStatusCode.NO_PERMISSION_ERROR, - "No permissions for this operation " + physicalPlan.getOperatorType())); - } - - return ((QueryPlan) physicalPlan).getTSExecuteStatementResp(isJdbcQuery); - } - private TSExecuteStatementResp executeSelectIntoStatement( String statement, long statementId,
