This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch 0.12controlQueryThread in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 53c15030ee2f8d48a978fae1834603602d611553 Author: Alima777 <[email protected]> AuthorDate: Mon Jan 3 15:42:00 2022 +0800 wrap internalExecuteQueryStatement with QueryTask() --- .../org/apache/iotdb/db/concurrent/ThreadName.java | 3 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 19 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 10 + .../db/query/dataset/NonAlignEngineDataSet.java | 4 +- .../dataset/RawQueryDataSetWithoutValueFilter.java | 5 +- ...yTaskPoolManager.java => QueryTaskManager.java} | 18 +- ...nager.java => RawQueryReadTaskPoolManager.java} | 30 +- .../org/apache/iotdb/db/service/TSServiceImpl.java | 357 ++++++++++++--------- 8 files changed, 262 insertions(+), 184 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java index 5f83c12..ad1e613 100644 --- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java +++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java @@ -43,7 +43,8 @@ public enum ThreadName { SYNC_MONITOR("Sync-Monitor"), LOAD_TSFILE("Load-TsFile"), TIME_COST_STATISTIC("TIME_COST_STATISTIC"), - QUERY_SERVICE("Query"); + QUERY_SERVICE("Query"), + SUB_RAW_QUERY_SERVICE("Sub_RawQuery"); private String name; diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index b880ae5..05699c6 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -238,8 +238,13 @@ public class IoTDBConfig { /** How many threads can concurrently flush. When <= 0, use CPU core number. */ private int concurrentFlushThread = Runtime.getRuntime().availableProcessors(); - /** How many threads can concurrently query. When <= 0, use CPU core number. */ - private int concurrentQueryThread = 8; + /** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */ + private int concurrentQueryThread = 16; + + /** + * How many threads can concurrently read data for raw data query. When <= 0, use CPU core number. + */ + private int concurrentSubRawQueryThread = 8; /** Is the write mem control for writing enable. */ private boolean enableMemControl = true; @@ -1091,10 +1096,18 @@ public class IoTDBConfig { return concurrentQueryThread; } - void setConcurrentQueryThread(int concurrentQueryThread) { + public void setConcurrentQueryThread(int concurrentQueryThread) { this.concurrentQueryThread = concurrentQueryThread; } + public int getConcurrentSubRawQueryThread() { + return concurrentSubRawQueryThread; + } + + void setConcurrentSubRawQueryThread(int concurrentSubRawQueryThread) { + this.concurrentSubRawQueryThread = concurrentSubRawQueryThread; + } + public long getSeqTsFileSize() { return seqTsFileSize; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index a2c6d3d..10825e9 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -426,6 +426,16 @@ public class IoTDBDescriptor { conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors()); } + conf.setConcurrentSubRawQueryThread( + Integer.parseInt( + properties.getProperty( + "concurrent_sub_rawQuery_thread", + Integer.toString(conf.getConcurrentSubRawQueryThread())))); + + if (conf.getConcurrentSubRawQueryThread() <= 0) { + conf.setConcurrentSubRawQueryThread(Runtime.getRuntime().availableProcessors()); + } + conf.setmManagerCacheSize( Integer.parseInt( properties diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java index f0f887a..f17a62d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java @@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.dataset; import org.apache.iotdb.db.concurrent.WrappedRunnable; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.query.control.QueryTimeManager; -import org.apache.iotdb.db.query.pool.QueryTaskPoolManager; +import org.apache.iotdb.db.query.pool.RawQueryReadTaskPoolManager; import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet; @@ -239,7 +239,7 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig /** flag that main thread is interrupted or not */ private volatile boolean interrupted = false; - private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance(); + private static final RawQueryReadTaskPoolManager pool = RawQueryReadTaskPoolManager.getInstance(); private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class); diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java index a5f613d..1df8c4f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java @@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.dataset; import org.apache.iotdb.db.concurrent.WrappedRunnable; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.control.QueryTimeManager; -import org.apache.iotdb.db.query.pool.QueryTaskPoolManager; +import org.apache.iotdb.db.query.pool.RawQueryReadTaskPoolManager; import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; import org.apache.iotdb.db.utils.datastructure.TimeSelector; @@ -169,7 +169,8 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet private final long queryId; - private static final QueryTaskPoolManager TASK_POOL_MANAGER = QueryTaskPoolManager.getInstance(); + private static final RawQueryReadTaskPoolManager TASK_POOL_MANAGER = + RawQueryReadTaskPoolManager.getInstance(); private static final Logger LOGGER = LoggerFactory.getLogger(RawQueryDataSetWithoutValueFilter.class); diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java similarity index 82% copy from server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java copy to server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java index 1dcda50..c68f526 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskManager.java @@ -27,11 +27,17 @@ import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class QueryTaskPoolManager extends AbstractPoolManager { +/** + * This pool is used to execute all query task send from client, and return TSExecuteStatementResp. + * Thread named by Query. + * + * <p>Execute QueryTask() in TSServiceImpl + */ +public class QueryTaskManager extends AbstractPoolManager { - private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskPoolManager.class); + private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskManager.class); - private QueryTaskPoolManager() { + private QueryTaskManager() { int threadCnt = Math.min( Runtime.getRuntime().availableProcessors(), @@ -39,8 +45,8 @@ public class QueryTaskPoolManager extends AbstractPoolManager { pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName()); } - public static QueryTaskPoolManager getInstance() { - return QueryTaskPoolManager.InstanceHolder.instance; + public static QueryTaskManager getInstance() { + return QueryTaskManager.InstanceHolder.instance; } @Override @@ -79,6 +85,6 @@ public class QueryTaskPoolManager extends AbstractPoolManager { // allowed to do nothing } - private static QueryTaskPoolManager instance = new QueryTaskPoolManager(); + private static QueryTaskManager instance = new QueryTaskManager(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java similarity index 67% rename from server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java rename to server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java index 1dcda50..dc84652 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/pool/RawQueryReadTaskPoolManager.java @@ -27,20 +27,27 @@ import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class QueryTaskPoolManager extends AbstractPoolManager { +/** + * This thread pool is used to read data for raw data query. Thread named by Sub_Raw_Query. + * + * <p>Execute ReadTask() in RawQueryReadTaskPoolManager + */ +public class RawQueryReadTaskPoolManager extends AbstractPoolManager { - private static final Logger LOGGER = LoggerFactory.getLogger(QueryTaskPoolManager.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RawQueryReadTaskPoolManager.class); - private QueryTaskPoolManager() { + private RawQueryReadTaskPoolManager() { int threadCnt = Math.min( Runtime.getRuntime().availableProcessors(), - IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread()); - pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName()); + IoTDBDescriptor.getInstance().getConfig().getConcurrentSubRawQueryThread()); + pool = + IoTDBThreadPoolFactory.newFixedThreadPool( + threadCnt, ThreadName.SUB_RAW_QUERY_SERVICE.getName()); } - public static QueryTaskPoolManager getInstance() { - return QueryTaskPoolManager.InstanceHolder.instance; + public static RawQueryReadTaskPoolManager getInstance() { + return RawQueryReadTaskPoolManager.InstanceHolder.instance; } @Override @@ -50,7 +57,7 @@ public class QueryTaskPoolManager extends AbstractPoolManager { @Override public String getName() { - return "query task"; + return "raw query read task"; } @Override @@ -59,9 +66,10 @@ public class QueryTaskPoolManager extends AbstractPoolManager { int threadCnt = Math.min( Runtime.getRuntime().availableProcessors(), - IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread()); + IoTDBDescriptor.getInstance().getConfig().getConcurrentSubRawQueryThread()); pool = - IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.QUERY_SERVICE.getName()); + IoTDBThreadPoolFactory.newFixedThreadPool( + threadCnt, ThreadName.SUB_RAW_QUERY_SERVICE.getName()); } } @@ -79,6 +87,6 @@ public class QueryTaskPoolManager extends AbstractPoolManager { // allowed to do nothing } - private static QueryTaskPoolManager instance = new QueryTaskPoolManager(); + private static RawQueryReadTaskPoolManager instance = new RawQueryReadTaskPoolManager(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 3fd9bee..45988da 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -80,6 +80,7 @@ import org.apache.iotdb.db.query.control.TracingManager; import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet; import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet; import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet; +import org.apache.iotdb.db.query.pool.QueryTaskManager; import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; import org.apache.iotdb.db.utils.QueryDataSetUtils; @@ -145,6 +146,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -211,6 +214,151 @@ public class TSServiceImpl implements TSIService.Iface { TimeUnit.MINUTES); } + /** + * Execute query statement, return TSExecuteStatementResp with dataset. + * + * @param plan must be a plan for Query: QueryPlan, ShowPlan, and some AuthorPlan + */ + protected class QueryTask implements Callable<TSExecuteStatementResp> { + + private PhysicalPlan plan; + private final String username; + private final String statement; + private final long statementId; + private final long timeout; + private final int fetchSize; + private final boolean enableRedirectQuery; + + public QueryTask( + PhysicalPlan plan, + String username, + String statement, + long statementId, + long timeout, + int fetchSize, + boolean enableRedirectQuery) { + this.plan = plan; + this.username = username; + this.statement = statement; + this.statementId = statementId; + this.timeout = timeout; + this.fetchSize = fetchSize; + this.enableRedirectQuery = enableRedirectQuery; + } + + @Override + public TSExecuteStatementResp call() throws Exception { + queryCount.incrementAndGet(); + AUDIT_LOGGER.debug( + "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement); + long startTime = System.currentTimeMillis(); + // generate the queryId for the operation + long queryId = sessionManager.requestQueryId(statementId, true); + try { + // register query info to queryTimeManager + if (!(plan instanceof ShowQueryProcesslistPlan)) { + queryTimeManager.registerQuery(queryId, startTime, statement, timeout); + } + if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) { + TracingManager tracingManager = TracingManager.getInstance(); + if (!(plan instanceof AlignByDevicePlan)) { + tracingManager.writeQueryInfo(queryId, statement, startTime, plan.getPaths().size()); + } else { + tracingManager.writeQueryInfo(queryId, statement, startTime); + } + } + + if (plan instanceof AuthorPlan) { + plan.setLoginUserName(username); + } + + TSExecuteStatementResp resp = null; + // execute it before createDataSet since it may change the content of query plan + if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) { + resp = getQueryColumnHeaders(plan, username); + } + if (plan instanceof QueryPlan) { + ((QueryPlan) plan).setEnableRedirect(enableRedirectQuery); + } + // create and cache dataset + QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize); + + if (newDataSet.getEndPoint() != null && enableRedirectQuery) { + LOGGER.debug( + "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint()); + QueryDataSet.EndPoint endPoint = newDataSet.getEndPoint(); + return redirectQueryToAnotherNode(resp, queryId, endPoint.getIp(), endPoint.getPort()); + } + + if (plan instanceof ShowPlan || plan instanceof AuthorPlan) { + resp = getListDataSetHeaders(newDataSet); + } else if (plan instanceof UDFPlan) { + resp = getQueryColumnHeaders(plan, username); + } + + resp.setOperationType(plan.getOperatorType().toString()); + if (plan.getOperatorType() == OperatorType.AGGREGATION) { + resp.setIgnoreTimeStamp(true); + } else if (plan instanceof ShowQueryProcesslistPlan) { + resp.setIgnoreTimeStamp(false); + } + + if (newDataSet instanceof DirectNonAlignDataSet) { + resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username)); + } else { + try { + TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username); + resp.setQueryDataSet(tsQueryDataSet); + } catch (RedirectException e) { + LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint()); + if (enableRedirectQuery) { + EndPoint endPoint = e.getEndPoint(); + redirectQueryToAnotherNode(resp, queryId, endPoint.ip, endPoint.port); + } else { + LOGGER.error( + "execute {} error, if session does not support redirect," + + " should not throw redirection exception.", + statement, + e); + } + } + } + resp.setQueryId(queryId); + + if (plan instanceof AlignByDevicePlan && config.isEnablePerformanceTracing()) { + TracingManager.getInstance() + .writePathsNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum()); + } + + if (enableMetric) { + long endTime = System.currentTimeMillis(); + SqlArgument sqlArgument = new SqlArgument(resp, plan, statement, startTime, endTime); + synchronized (sqlArgumentList) { + sqlArgumentList.add(sqlArgument); + if (sqlArgumentList.size() >= MAX_SIZE) { + sqlArgumentList.subList(0, DELETE_SIZE).clear(); + } + } + } + + // remove query info in QueryTimeManager + if (!(plan instanceof ShowQueryProcesslistPlan)) { + queryTimeManager.unRegisterQuery(queryId); + } + return resp; + } catch (Exception e) { + releaseQueryResourceNoExceptions(queryId); + throw e; + } finally { + Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime); + long costTime = System.currentTimeMillis() - startTime; + if (costTime >= config.getSlowQueryThreshold()) { + SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement); + } + } + } + } + public static List<SqlArgument> getSqlArgumentList() { return sqlArgumentList; } @@ -603,16 +751,22 @@ public class TSServiceImpl implements TSIService.Iface { processor.parseSQLToPhysicalPlan( statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize); - return physicalPlan.isQuery() - ? internalExecuteQueryStatement( - statement, - req.statementId, - physicalPlan, - req.fetchSize, - req.timeout, - sessionManager.getUsername(req.getSessionId()), - req.isEnableRedirectQuery()) - : executeUpdateStatement(physicalPlan, req.getSessionId()); + if (physicalPlan.isQuery()) { + Future<TSExecuteStatementResp> resp = + QueryTaskManager.getInstance() + .submit( + new QueryTask( + physicalPlan, + sessionManager.getUsername(req.sessionId), + req.statement, + req.statementId, + req.timeout, + req.fetchSize, + req.enableRedirectQuery)); + return resp.get(); + } else { + return executeUpdateStatement(physicalPlan, req.getSessionId()); + } } catch (InterruptedException e) { LOGGER.error(INFO_INTERRUPT_ERROR, req, e); Thread.currentThread().interrupt(); @@ -636,17 +790,23 @@ public class TSServiceImpl implements TSIService.Iface { processor.parseSQLToPhysicalPlan( statement, sessionManager.getZoneId(req.sessionId), req.fetchSize); - return physicalPlan.isQuery() - ? internalExecuteQueryStatement( - statement, - req.statementId, - physicalPlan, - req.fetchSize, - req.timeout, - sessionManager.getUsername(req.getSessionId()), - req.isEnableRedirectQuery()) - : RpcUtils.getTSExecuteStatementResp( - TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement."); + if (physicalPlan.isQuery()) { + Future<TSExecuteStatementResp> resp = + QueryTaskManager.getInstance() + .submit( + new QueryTask( + physicalPlan, + sessionManager.getUsername(req.sessionId), + req.statement, + req.statementId, + req.timeout, + req.fetchSize, + req.enableRedirectQuery)); + return resp.get(); + } else { + return RpcUtils.getTSExecuteStatementResp( + TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement."); + } } catch (InterruptedException e) { LOGGER.error(INFO_INTERRUPT_ERROR, req, e); Thread.currentThread().interrupt(); @@ -667,17 +827,23 @@ public class TSServiceImpl implements TSIService.Iface { PhysicalPlan physicalPlan = processor.rawDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId)); - return physicalPlan.isQuery() - ? internalExecuteQueryStatement( - "", - req.statementId, - physicalPlan, - req.fetchSize, - config.getQueryTimeoutThreshold(), - sessionManager.getUsername(req.sessionId), - req.isEnableRedirectQuery()) - : RpcUtils.getTSExecuteStatementResp( - TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement."); + if (physicalPlan.isQuery()) { + Future<TSExecuteStatementResp> resp = + QueryTaskManager.getInstance() + .submit( + new QueryTask( + physicalPlan, + sessionManager.getUsername(req.sessionId), + "", + req.statementId, + config.getQueryTimeoutThreshold(), + req.fetchSize, + req.enableRedirectQuery)); + return resp.get(); + } else { + return RpcUtils.getTSExecuteStatementResp( + TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement."); + } } catch (InterruptedException e) { LOGGER.error(INFO_INTERRUPT_ERROR, req, e); Thread.currentThread().interrupt(); @@ -689,133 +855,6 @@ public class TSServiceImpl implements TSIService.Iface { } } - /** - * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, UDFPlan, - * some AuthorPlan - */ - @SuppressWarnings({"squid:S3776", "squid:S1141"}) // Suppress high Cognitive Complexity warning - private TSExecuteStatementResp internalExecuteQueryStatement( - String statement, - long statementId, - PhysicalPlan plan, - int fetchSize, - long timeout, - String username, - boolean enableRedirect) - throws QueryProcessException, SQLException, StorageEngineException, - QueryFilterOptimizationException, MetadataException, IOException, InterruptedException, - TException, AuthException { - queryCount.incrementAndGet(); - AUDIT_LOGGER.debug( - "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement); - long startTime = System.currentTimeMillis(); - long queryId = -1; - try { - // generate the queryId for the operation - queryId = sessionManager.requestQueryId(statementId, true); - // register query info to queryTimeManager - if (!(plan instanceof ShowQueryProcesslistPlan)) { - queryTimeManager.registerQuery(queryId, startTime, statement, timeout); - } - if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) { - TracingManager tracingManager = TracingManager.getInstance(); - if (!(plan instanceof AlignByDevicePlan)) { - tracingManager.writeQueryInfo(queryId, statement, startTime, plan.getPaths().size()); - } else { - tracingManager.writeQueryInfo(queryId, statement, startTime); - } - } - - if (plan instanceof AuthorPlan) { - plan.setLoginUserName(username); - } - - TSExecuteStatementResp resp = null; - // execute it before createDataSet since it may change the content of query plan - if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) { - resp = getQueryColumnHeaders(plan, username); - } - if (plan instanceof QueryPlan) { - ((QueryPlan) plan).setEnableRedirect(enableRedirect); - } - // create and cache dataset - QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize); - - if (newDataSet.getEndPoint() != null && enableRedirect) { - LOGGER.debug( - "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint()); - QueryDataSet.EndPoint endPoint = newDataSet.getEndPoint(); - return redirectQueryToAnotherNode(resp, queryId, endPoint.getIp(), endPoint.getPort()); - } - - if (plan instanceof ShowPlan || plan instanceof AuthorPlan) { - resp = getListDataSetHeaders(newDataSet); - } else if (plan instanceof UDFPlan) { - resp = getQueryColumnHeaders(plan, username); - } - - resp.setOperationType(plan.getOperatorType().toString()); - if (plan.getOperatorType() == OperatorType.AGGREGATION) { - resp.setIgnoreTimeStamp(true); - } else if (plan instanceof ShowQueryProcesslistPlan) { - resp.setIgnoreTimeStamp(false); - } - - if (newDataSet instanceof DirectNonAlignDataSet) { - resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username)); - } else { - try { - TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username); - resp.setQueryDataSet(tsQueryDataSet); - } catch (RedirectException e) { - LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint()); - if (enableRedirect) { - EndPoint endPoint = e.getEndPoint(); - redirectQueryToAnotherNode(resp, queryId, endPoint.ip, endPoint.port); - } else { - LOGGER.error( - "execute {} error, if session does not support redirect," - + " should not throw redirection exception.", - statement, - e); - } - } - } - resp.setQueryId(queryId); - - if (plan instanceof AlignByDevicePlan && config.isEnablePerformanceTracing()) { - TracingManager.getInstance() - .writePathsNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum()); - } - - if (enableMetric) { - long endTime = System.currentTimeMillis(); - SqlArgument sqlArgument = new SqlArgument(resp, plan, statement, startTime, endTime); - synchronized (sqlArgumentList) { - sqlArgumentList.add(sqlArgument); - if (sqlArgumentList.size() >= MAX_SIZE) { - sqlArgumentList.subList(0, DELETE_SIZE).clear(); - } - } - } - - // remove query info in QueryTimeManager - if (!(plan instanceof ShowQueryProcesslistPlan)) { - queryTimeManager.unRegisterQuery(queryId); - } - return resp; - } catch (Exception e) { - releaseQueryResourceNoExceptions(queryId); - throw e; - } finally { - Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime); - long costTime = System.currentTimeMillis() - startTime; - if (costTime >= config.getSlowQueryThreshold()) { - SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement); - } - } - } - /** Redirect query */ private TSExecuteStatementResp redirectQueryToAnotherNode( TSExecuteStatementResp resp, long queryId, String ip, int port) {
