This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch optPhysical in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ca28d5124eb705aa92dbf614cf0ddea76012f7c2 Author: Alima777 <[email protected]> AuthorDate: Tue Jun 22 14:53:35 2021 +0800 restructure some components in tsserviceImpl --- .../db/qp/physical/crud/AlignByDevicePlan.java | 6 ++- .../iotdb/db/qp/physical/crud/QueryPlan.java | 2 + .../db/qp/physical/crud/RawDataQueryPlan.java | 5 ++ .../iotdb/db/query/control/TracingManager.java | 10 ++++ .../db/query/dataset/AlignByDeviceDataSet.java | 4 -- .../org/apache/iotdb/db/service/TSServiceImpl.java | 53 ++++++++-------------- 6 files changed, 40 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java index 55a75eb..44c92cc 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java @@ -46,7 +46,6 @@ public class AlignByDevicePlan extends QueryPlan { private Map<String, TSDataType> measurementDataTypeMap; private GroupByTimePlan groupByTimePlan; - private FillQueryPlan fillQueryPlan; private AggregationPlan aggregationPlan; @@ -142,6 +141,11 @@ public class AlignByDevicePlan extends QueryPlan { this.setOperatorType(Operator.OperatorType.AGGREGATION); } + @Override + public int getPathsNumForQuery() { + return measurements.size() * devices.size(); + } + /** * Exist: the measurements which don't belong to NonExist and Constant. NonExist: the measurements * that do not exist in any device, data type is considered as String. The value is considered as diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java index 7599424..d5d8eaf 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java @@ -181,4 +181,6 @@ public abstract class QueryPlan extends PhysicalPlan { public void setWithoutAllNull(boolean withoutAllNull) { this.withoutAllNull = withoutAllNull; } + + public abstract int getPathsNumForQuery(); } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java index 796acc3..43c474e 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java @@ -243,4 +243,9 @@ public class RawDataQueryPlan extends QueryPlan { public boolean isRawQuery() { return true; } + + @Override + public int getPathsNumForQuery() { + return deduplicatedPaths.size(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java index 450c4f4..5e19de3 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java @@ -22,6 +22,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +73,14 @@ public class TracingManager { return TracingManagerHelper.INSTANCE; } + public void writeQueryInfo(long queryId, PhysicalPlan plan, String statement, long startTime) + throws IOException { + if (plan instanceof QueryPlan + && IoTDBDescriptor.getInstance().getConfig().isEnablePerformanceTracing()) { + writeQueryInfo(queryId, statement, startTime, ((QueryPlan) plan).getPathsNumForQuery()); + } + } + public void writeQueryInfo(long queryId, String statement, long startTime, int pathsNum) throws IOException { queryStartTime.put(queryId, startTime); diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java index 829f3f1..a7b18f9 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java @@ -119,10 +119,6 @@ public class AlignByDeviceDataSet extends QueryDataSet { this.deviceIterator = devices.iterator(); } - public int getPathsNum() { - return pathsNum; - } - @Override @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public boolean hasNextWithoutConstraint() throws IOException { diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 6a2c8e8..88afe66 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -76,7 +76,6 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.control.QueryTimeManager; import org.apache.iotdb.db.query.control.TracingManager; -import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet; import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet; import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet; import org.apache.iotdb.db.query.dataset.UDTFDataSet; @@ -791,34 +790,14 @@ public class TSServiceImpl implements TSIService.Iface { long startTime = System.currentTimeMillis(); long queryId = -1; try { - - // pair.left = fetchSize, pair.right = deduplicatedNum - Pair<Integer, Integer> p = getMemoryParametersFromPhysicalPlan(plan, fetchSize); - fetchSize = p.left; - - // generate the queryId for the operation - queryId = generateQueryId(true, fetchSize, p.right); - // register query info to queryTimeManager - if (!(plan instanceof ShowQueryProcesslistPlan)) { - queryTimeManager.registerQuery(queryId, startTime, statement, timeout); - } - if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) { - TracingManager tracingManager = TracingManager.getInstance(); - if (!(plan instanceof AlignByDevicePlan)) { - tracingManager.writeQueryInfo(queryId, statement, startTime, plan.getPaths().size()); - } else { - tracingManager.writeQueryInfo(queryId, statement, startTime); - } - } - - statementId2QueryId - .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>()) - .add(queryId); - if (plan instanceof AuthorPlan) { plan.setLoginUserName(username); } + queryId = registerQueryId(plan, statementId, fetchSize); + queryTimeManager.registerQuery(queryId, startTime, statement, timeout); + TracingManager.getInstance().writeQueryInfo(queryId, plan, statement, startTime); + TSExecuteStatementResp resp = null; // execute it before createDataSet since it may change the content of query plan if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) { @@ -827,6 +806,7 @@ public class TSServiceImpl implements TSIService.Iface { if (plan instanceof QueryPlan) { ((QueryPlan) plan).setEnableRedirect(enableRedirect); } + // create and cache dataset QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize); @@ -848,8 +828,9 @@ public class TSServiceImpl implements TSIService.Iface { } else if (plan instanceof UDFPlan) { resp = getQueryColumnHeaders(plan, username); } - + resp.setQueryId(queryId); resp.setOperationType(plan.getOperatorType().toString()); + if (plan.getOperatorType() == OperatorType.AGGREGATION) { resp.setIgnoreTimeStamp(true); } else if (plan instanceof ShowQueryProcesslistPlan) { @@ -881,12 +862,6 @@ public class TSServiceImpl implements TSIService.Iface { } } } - resp.setQueryId(queryId); - - if (plan instanceof AlignByDevicePlan && config.isEnablePerformanceTracing()) { - TracingManager.getInstance() - .writePathsNum(queryId, ((AlignByDeviceDataSet) newDataSet).getPathsNum()); - } if (enableMetric) { long endTime = System.currentTimeMillis(); @@ -900,9 +875,7 @@ public class TSServiceImpl implements TSIService.Iface { } // remove query info in QueryTimeManager - if (!(plan instanceof ShowQueryProcesslistPlan)) { - queryTimeManager.unRegisterQuery(queryId); - } + queryTimeManager.unRegisterQuery(queryId); return resp; } catch (Exception e) { releaseQueryResourceNoExceptions(queryId); @@ -916,6 +889,16 @@ public class TSServiceImpl implements TSIService.Iface { } } + private long registerQueryId(PhysicalPlan plan, long statementId, int fetchSize) { + // pair.left = fetchSize, pair.right = deduplicatedNum + Pair<Integer, Integer> p = getMemoryParametersFromPhysicalPlan(plan, fetchSize); + // generate the queryId for the operation + long queryId = generateQueryId(true, p.left, p.right); + statementId2QueryId.computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>()).add(queryId); + + return queryId; + } + /** * get fetchSize and deduplicatedPathNum that are used for memory estimation *
