This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit fe9937a8cfe6db48dd4b4d2cff31b10ebbc91f77 Author: lta <[email protected]> AuthorDate: Thu May 16 18:41:26 2019 +0800 add aggre feature without timegenerator --- .../executor/ClusterAggregateEngineExecutor.java | 114 ++++++++++++++++ .../cluster/query/executor/ClusterQueryRouter.java | 29 +++- .../querynode/ClusterLocalSingleQueryManager.java | 85 ++++++++++-- .../query/utils/QueryPlanPartitionUtils.java | 146 +++++++++++++++------ .../db/qp/executor/IQueryProcessExecutor.java | 4 +- .../db/query/executor/AggregateEngineExecutor.java | 75 +++++++---- .../iotdb/db/query/executor/EngineQueryRouter.java | 4 +- 7 files changed, 378 insertions(+), 79 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java new file mode 100644 index 0000000..b63b311 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.query.executor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; +import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.query.aggregation.AggreResultData; +import org.apache.iotdb.db.query.aggregation.AggregateFunction; +import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc; +import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader; +import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator; +import org.apache.iotdb.db.query.executor.AggregateEngineExecutor; +import org.apache.iotdb.db.query.factory.AggreFuncFactory; +import org.apache.iotdb.db.query.factory.SeriesReaderFactory; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader; +import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.expression.IExpression; +import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; + +public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor { + + private ClusterRpcSingleQueryManager queryManager; + + public ClusterAggregateEngineExecutor(List<Path> selectedSeries, List<String> aggres, + IExpression expression, ClusterRpcSingleQueryManager queryManager) { + super(selectedSeries, aggres, expression); + this.queryManager = queryManager; + } + + @Override + public QueryDataSet executeWithoutTimeGenerator(QueryContext context) + throws FileNodeManagerException, IOException, PathErrorException, ProcessorException { + Filter timeFilter = expression != null ? ((GlobalTimeExpression) expression).getFilter() : null; + Map<Path, ClusterSelectSeriesReader> selectPathReaders = queryManager.getSelectSeriesReaders(); + + List<Path> paths = new ArrayList<>(); + List<IPointReader> readers = new ArrayList<>(); + List<TSDataType> dataTypes = new ArrayList<>(); + for (int i = 0; i < selectedSeries.size(); i++) { + Path path = selectedSeries.get(i); + + if (selectPathReaders.containsKey(path)) { + ClusterSelectSeriesReader reader = selectPathReaders.get(path); + readers.add(reader); + dataTypes.add(reader.getDataType()); + } else { + paths.add(path); + // construct AggregateFunction + TSDataType tsDataType = MManager.getInstance() + .getSeriesType(selectedSeries.get(i).getFullPath()); + AggregateFunction function = AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType); + function.init(); + + QueryDataSource queryDataSource = QueryResourceManager.getInstance() + .getQueryDataSource(selectedSeries.get(i), context); + + // sequence reader for sealed tsfile, unsealed tsfile, memory + SequenceDataReader sequenceReader; + if (function instanceof MaxTimeAggrFunc || function instanceof LastAggrFunc) { + sequenceReader = new SequenceDataReader(queryDataSource.getSeqDataSource(), timeFilter, + context, true); + } else { + sequenceReader = new SequenceDataReader(queryDataSource.getSeqDataSource(), timeFilter, + context, false); + } + + // unseq reader for all chunk groups in unSeqFile, memory + PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance() + .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter); + + AggreResultData aggreResultData = aggregateWithoutTimeGenerator(function, + sequenceReader, unSeqMergeReader, timeFilter); + readers.add(new AggreResultDataPointReader(aggreResultData)); + } + } + QueryResourceManager.getInstance() + .beginQueryOfGivenQueryPaths(context.getJobId(), paths); + + return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, readers); + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java index 2fa4576..672ca9d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java @@ -29,7 +29,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.executor.FillEngineExecutor; +import org.apache.iotdb.db.query.executor.AggregateEngineExecutor; import org.apache.iotdb.db.query.executor.IEngineQueryRouter; import org.apache.iotdb.db.query.fill.IFill; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; @@ -95,7 +95,32 @@ public class ClusterQueryRouter implements IEngineQueryRouter { public QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres, IExpression expression, QueryContext context) throws QueryFilterOptimizationException, FileNodeManagerException, IOException, PathErrorException, ProcessorException { - throw new UnsupportedOperationException(); + + ClusterRpcSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance() + .getSingleQuery(context.getJobId()); + + try { + if (expression != null) { + IExpression optimizedExpression = ExpressionOptimizer.getInstance() + .optimize(expression, selectedSeries); + AggregateEngineExecutor engineExecutor = new ClusterAggregateEngineExecutor( + selectedSeries, aggres, optimizedExpression, queryManager); + if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) { + queryManager.initQueryResource(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel()); + return engineExecutor.executeWithoutTimeGenerator(context); + } else { + queryManager.initQueryResource(QueryType.FILTER, getReadDataConsistencyLevel()); + return engineExecutor.executeWithTimeGenerator(context); + } + } else { + AggregateEngineExecutor engineExecutor = new ClusterAggregateEngineExecutor( + selectedSeries, aggres, null, queryManager); + queryManager.initQueryResource(QueryType.NO_FILTER, getReadDataConsistencyLevel()); + return engineExecutor.executeWithoutTimeGenerator(context); + } + } catch (QueryFilterOptimizationException | IOException | RaftConnectionException e) { + throw new FileNodeManagerException(e); + } } @Override diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java index f776477..76a141e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java @@ -30,9 +30,9 @@ import org.apache.iotdb.cluster.query.PathType; import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory; import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader; import org.apache.iotdb.cluster.query.reader.querynode.ClusterFillSelectSeriesBatchReader; +import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader; import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader; import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp; -import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader; import org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReader; import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest; import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest; @@ -54,6 +54,7 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.executor.AbstractExecutorWithoutTimeGenerator; +import org.apache.iotdb.db.query.executor.AggregateEngineExecutor; import org.apache.iotdb.db.query.fill.IFill; import org.apache.iotdb.db.query.fill.PreviousFill; import org.apache.iotdb.db.query.reader.IPointReader; @@ -63,7 +64,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.expression.ExpressionType; +import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; +import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.slf4j.Logger; @@ -134,16 +137,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM if (plan instanceof GroupByPlan) { throw new UnsupportedOperationException(); } else if (plan instanceof AggregationPlan) { - throw new UnsupportedOperationException(); + handleAggreSeriesReader(plan, context, response); } else if (plan instanceof FillQueryPlan) { - handleFillSeriesRerader(plan, context, response); + handleFillSeriesReader(plan, context, response); } else { - if (plan.getExpression() == null - || plan.getExpression().getType() == ExpressionType.GLOBAL_TIME) { - handleSelectReaderWithoutTimeGenerator(plan, context, response); - } else { - handleSelectReaderWithTimeGenerator(plan, context, response); - } + handleSelectSeriesReader(plan, context, response); } } if (queryPlanMap.containsKey(PathType.FILTER_PATH)) { @@ -158,7 +156,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM * * @param queryPlan fill query plan */ - private void handleFillSeriesRerader(QueryPlan queryPlan, QueryContext context, + private void handleFillSeriesReader(QueryPlan queryPlan, QueryContext context, InitSeriesReaderResponse response) throws FileNodeManagerException, PathErrorException, IOException { FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan; @@ -190,6 +188,73 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes); } + + /** + * Handle aggregation series reader + * + * @param queryPlan fill query plan + */ + private void handleAggreSeriesReader(QueryPlan queryPlan, QueryContext context, + InitSeriesReaderResponse response) + throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException { + if (queryPlan.getExpression() == null + || queryPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) { + handleAggreSeriesReaderWithoutTimeGenerator(queryPlan,context,response); + } else { + handleSelectReaderWithTimeGenerator(queryPlan, context, response); + } + } + + /** + * Handle aggregation series reader without value filter + * + * @param queryPlan fill query plan + */ + private void handleAggreSeriesReaderWithoutTimeGenerator(QueryPlan queryPlan, QueryContext context, + InitSeriesReaderResponse response) + throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException { + AggregationPlan fillQueryPlan = (AggregationPlan) queryPlan; + + List<Path> selectedPaths = fillQueryPlan.getPaths(); + QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedPaths); + + IExpression optimizedExpression = ExpressionOptimizer.getInstance() + .optimize(fillQueryPlan.getExpression(), selectedPaths); + AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor( + selectedPaths, fillQueryPlan.getAggregations(), optimizedExpression); + + List<IPointReader> readers = engineExecutor.constructAggreReadersWithoutTimeGenerator(context); + + List<TSDataType> dataTypes = engineExecutor.getDataTypes(); + + for (int i =0 ; i < selectedPaths.size(); i ++) { + Path path = selectedPaths.get(i); + selectSeriesReaders.put(path.getFullPath(), + new ClusterSelectSeriesBatchReader(dataTypes.get(i), readers.get(i))); + dataTypeMap.put(path.getFullPath(), dataTypes.get(i)); + } + + response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes); + } + + /** + * Handle select series query + * + * @param plan plan query plan + * @param context query context + * @param response response for coordinator node + */ + private void handleSelectSeriesReader(QueryPlan plan, QueryContext context, + InitSeriesReaderResponse response) + throws FileNodeManagerException, IOException, PathErrorException { + if (plan.getExpression() == null + || plan.getExpression().getType() == ExpressionType.GLOBAL_TIME) { + handleSelectReaderWithoutTimeGenerator(plan, context, response); + } else { + handleSelectReaderWithTimeGenerator(plan, context, response); + } + } + /** * Handle select series query with no filter or only global time filter * diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java index 546282a..fc0d401 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java @@ -32,8 +32,6 @@ import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; -import org.apache.iotdb.db.query.fill.IFill; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.expression.ExpressionType; import org.apache.iotdb.tsfile.read.expression.IExpression; @@ -55,13 +53,32 @@ public class QueryPlanPartitionUtils { throws PathErrorException { QueryPlan queryPLan = singleQueryManager.getOriginQueryPlan(); if (queryPLan instanceof FillQueryPlan) { - splitFillPlan((FillQueryPlan)queryPLan, singleQueryManager); + splitFillPlan(singleQueryManager); + } else if (queryPLan instanceof AggregationPlan) { + splitAggregationPlanBySelectPath(singleQueryManager); + } else if (queryPLan instanceof GroupByPlan) { + splitGroupByPlanBySelectPath(singleQueryManager); } else { splitQueryPlanBySelectPath(singleQueryManager); } } /** + * Split query plan with filter. + */ + public static void splitQueryPlanWithValueFilter( + ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException { + QueryPlan queryPlan = singleQueryManager.getOriginQueryPlan(); + if (queryPlan instanceof GroupByPlan) { + splitGroupByPlanWithFilter(singleQueryManager); + } else if (queryPlan instanceof AggregationPlan) { + splitAggregationPlanWithFilter(singleQueryManager); + } else { + splitQueryPlanWithFilter(singleQueryManager); + } + } + + /** * Split query plan by select paths */ private static void splitQueryPlanBySelectPath(ClusterRpcSingleQueryManager singleQueryManager) @@ -88,33 +105,100 @@ public class QueryPlanPartitionUtils { } } + /** - * Split query plan with filter. + * Split query plan by filter paths */ - public static void splitQueryPlanWithValueFilter( - ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException { + private static void splitQueryPlanByFilterPath(ClusterRpcSingleQueryManager singleQueryManager) + throws PathErrorException { QueryPlan queryPlan = singleQueryManager.getOriginQueryPlan(); - if (queryPlan instanceof GroupByPlan) { - splitGroupByPlan((GroupByPlan) queryPlan, singleQueryManager); - } else if (queryPlan instanceof AggregationPlan) { - splitAggregationPlan((AggregationPlan) queryPlan, singleQueryManager); - } else { - splitQueryPlan(queryPlan, singleQueryManager); + // split query plan by filter path + Map<String, FilterGroupEntity> filterGroupEntityMap = singleQueryManager + .getFilterGroupEntityMap(); + IExpression expression = queryPlan.getExpression(); + ExpressionUtils.getAllExpressionSeries(expression, filterGroupEntityMap); + for (FilterGroupEntity filterGroupEntity : filterGroupEntityMap.values()) { + List<Path> filterSeriesList = filterGroupEntity.getFilterPaths(); + // create filter sub query plan + QueryPlan subQueryPlan = new QueryPlan(); + subQueryPlan.setPaths(filterSeriesList); + IExpression subExpression = ExpressionUtils + .pruneFilterTree(expression.clone(), filterSeriesList); + if (subExpression.getType() != ExpressionType.TRUE) { + subQueryPlan.setExpression(subExpression); + } + filterGroupEntity.setQueryPlan(subQueryPlan); } } - private static void splitGroupByPlan(GroupByPlan groupByPlan, + /** + * Split group by plan by select path + */ + private static void splitGroupByPlanBySelectPath( ClusterRpcSingleQueryManager singleQueryManager) { throw new UnsupportedOperationException(); } - private static void splitAggregationPlan(AggregationPlan aggregationPlan, - ClusterRpcSingleQueryManager singleQueryManager) { - throw new UnsupportedOperationException(); + /** + * Split group by plan with filter path + */ + private static void splitGroupByPlanWithFilter(ClusterRpcSingleQueryManager singleQueryManager) + throws PathErrorException { + splitGroupByPlanBySelectPath(singleQueryManager); + splitQueryPlanByFilterPath(singleQueryManager); } - private static void splitFillPlan(FillQueryPlan fillQueryPlan, - ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException { + /** + * Split aggregation plan by select path + */ + private static void splitAggregationPlanBySelectPath( + ClusterRpcSingleQueryManager singleQueryManager) + throws PathErrorException { + AggregationPlan queryPlan = (AggregationPlan) singleQueryManager.getOriginQueryPlan(); + List<Path> selectPaths = queryPlan.getPaths(); + List<String> aggregations = new ArrayList<>(); + Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId(); + Map<String, List<String>> selectAggregationByGroupId = new HashMap<>(); + Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans(); + for (int i = 0; i < selectPaths.size(); i++) { + Path path = selectPaths.get(i); + String aggregation = aggregations.get(i); + String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice()); + if (!selectSeriesByGroupId.containsKey(groupId)) { + selectSeriesByGroupId.put(groupId, new ArrayList<>()); + selectAggregationByGroupId.put(groupId, new ArrayList<>()); + } + selectAggregationByGroupId.get(groupId).add(aggregation); + selectSeriesByGroupId.get(groupId).add(path); + } + for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) { + String groupId = entry.getKey(); + List<Path> paths = entry.getValue(); + AggregationPlan subQueryPlan = new AggregationPlan(); + subQueryPlan.setProposer(queryPlan.getProposer()); + subQueryPlan.setPaths(paths); + subQueryPlan.setExpression(queryPlan.getExpression()); + subQueryPlan.setAggregations(selectAggregationByGroupId.get(groupId)); + selectPathPlans.put(groupId, subQueryPlan); + } + } + + /** + * Split aggregation plan with filter path + */ + private static void splitAggregationPlanWithFilter( + ClusterRpcSingleQueryManager singleQueryManager) + throws PathErrorException { + splitAggregationPlanBySelectPath(singleQueryManager); + splitQueryPlanByFilterPath(singleQueryManager); + } + + /** + * Split fill plan which only contain select paths. + */ + private static void splitFillPlan(ClusterRpcSingleQueryManager singleQueryManager) + throws PathErrorException { + FillQueryPlan fillQueryPlan = (FillQueryPlan) singleQueryManager.getOriginQueryPlan(); List<Path> selectPaths = fillQueryPlan.getPaths(); Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId(); Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans(); @@ -138,25 +222,13 @@ public class QueryPlanPartitionUtils { } } - private static void splitQueryPlan(QueryPlan queryPlan, - ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException { + /** + * Split query plan with filter + */ + private static void splitQueryPlanWithFilter(ClusterRpcSingleQueryManager singleQueryManager) + throws PathErrorException { splitQueryPlanBySelectPath(singleQueryManager); - // split query plan by filter path - Map<String, FilterGroupEntity> filterGroupEntityMap = singleQueryManager - .getFilterGroupEntityMap(); - IExpression expression = queryPlan.getExpression(); - ExpressionUtils.getAllExpressionSeries(expression, filterGroupEntityMap); - for (FilterGroupEntity filterGroupEntity : filterGroupEntityMap.values()) { - List<Path> filterSeriesList = filterGroupEntity.getFilterPaths(); - // create filter sub query plan - QueryPlan subQueryPlan = new QueryPlan(); - subQueryPlan.setPaths(filterSeriesList); - IExpression subExpression = ExpressionUtils - .pruneFilterTree(expression.clone(), filterSeriesList); - if (subExpression.getType() != ExpressionType.TRUE) { - subQueryPlan.setExpression(subExpression); - } - filterGroupEntity.setQueryPlan(subQueryPlan); - } + splitQueryPlanByFilterPath(singleQueryManager); } + } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java index 42a2e8b..920aeef 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java @@ -56,9 +56,7 @@ public interface IQueryProcessExecutor { QueryFilterOptimizationException, ProcessorException; /** - * process aggregate plan of qp layer, construct queryDataSet. <<<<<<< HEAD - * - * ======= >>>>>>> master + * process aggregate plan of qp layer, construct queryDataSet. */ QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression, QueryContext context) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java index 508a787..6c458a5 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.iotdb.db.query.executor; import java.io.IOException; @@ -28,7 +27,6 @@ import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.query.factory.AggreFuncFactory; import org.apache.iotdb.db.query.aggregation.AggreResultData; import org.apache.iotdb.db.query.aggregation.AggregateFunction; import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc; @@ -37,6 +35,7 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader; import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator; +import org.apache.iotdb.db.query.factory.AggreFuncFactory; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; @@ -54,9 +53,10 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; public class AggregateEngineExecutor { - private List<Path> selectedSeries; - private List<String> aggres; - private IExpression expression; + protected List<Path> selectedSeries; + protected List<String> aggres; + protected IExpression expression; + protected List<TSDataType> dataTypes; /** * aggregation batch calculation size. @@ -72,6 +72,7 @@ public class AggregateEngineExecutor { this.aggres = aggres; this.expression = expression; this.aggregateFetchSize = 10 * IoTDBDescriptor.getInstance().getConfig().getFetchSize(); + this.dataTypes = new ArrayList<>(); } /** @@ -79,8 +80,19 @@ public class AggregateEngineExecutor { * * @param context query context */ - public QueryDataSet executeWithOutTimeGenerator(QueryContext context) + public QueryDataSet executeWithoutTimeGenerator(QueryContext context) throws FileNodeManagerException, IOException, PathErrorException, ProcessorException { + List<IPointReader> resultDataPointReaders = constructAggreReadersWithoutTimeGenerator(context); + return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, resultDataPointReaders); + } + + /** + * Construct aggregate readers with only time filter or no filter. + * + * @param context query context + */ + public List<IPointReader> constructAggreReadersWithoutTimeGenerator(QueryContext context) + throws FileNodeManagerException, PathErrorException, IOException, ProcessorException { Filter timeFilter = null; if (expression != null) { timeFilter = ((GlobalTimeExpression) expression).getFilter(); @@ -121,11 +133,17 @@ public class AggregateEngineExecutor { List<AggreResultData> aggreResultDataList = new ArrayList<>(); //TODO use multi-thread for (int i = 0; i < selectedSeries.size(); i++) { - AggreResultData aggreResultData = aggregateWithOutTimeGenerator(aggregateFunctions.get(i), + AggreResultData aggreResultData = aggregateWithoutTimeGenerator(aggregateFunctions.get(i), readersOfSequenceData.get(i), readersOfUnSequenceData.get(i), timeFilter); aggreResultDataList.add(aggreResultData); } - return constructDataSet(aggreResultDataList); + + List<IPointReader> resultDataPointReaders = new ArrayList<>(); + for (AggreResultData resultData : aggreResultDataList) { + dataTypes.add(resultData.getDataType()); + resultDataPointReaders.add(new AggreResultDataPointReader(resultData)); + } + return resultDataPointReaders; } /** @@ -137,7 +155,7 @@ public class AggregateEngineExecutor { * @param filter time filter or null * @return one series aggregate result data */ - private AggreResultData aggregateWithOutTimeGenerator(AggregateFunction function, + protected AggreResultData aggregateWithoutTimeGenerator(AggregateFunction function, SequenceDataReader sequenceReader, IPointReader unSequenceReader, Filter filter) throws IOException, ProcessorException { if (function instanceof MaxTimeAggrFunc || function instanceof LastAggrFunc) { @@ -256,6 +274,18 @@ public class AggregateEngineExecutor { */ public QueryDataSet executeWithTimeGenerator(QueryContext context) throws FileNodeManagerException, PathErrorException, IOException, ProcessorException { + List<IPointReader> resultDataPointReaders = constructAggreReadersWithTimeGenerator(context); + return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, resultDataPointReaders); + } + + + /** + * Construct aggregate readers with value filter. + * + * @param context query context + */ + private List<IPointReader> constructAggreReadersWithTimeGenerator(QueryContext context) + throws FileNodeManagerException, PathErrorException, IOException, ProcessorException { QueryResourceManager .getInstance().beginQueryOfGivenQueryPaths(context.getJobId(), selectedSeries); QueryResourceManager.getInstance().beginQueryOfGivenExpression(context.getJobId(), expression); @@ -271,12 +301,19 @@ public class AggregateEngineExecutor { function.init(); aggregateFunctions.add(function); } - List<AggreResultData> batchDataList = aggregateWithTimeGenerator(aggregateFunctions, + List<AggreResultData> aggreResultDataList = aggregateWithTimeGenerator(aggregateFunctions, timestampGenerator, readersOfSelectedSeries); - return constructDataSet(batchDataList); + + List<IPointReader> resultDataPointReaders = new ArrayList<>(); + for (AggreResultData resultData : aggreResultDataList) { + dataTypes.add(resultData.getDataType()); + resultDataPointReaders.add(new AggreResultDataPointReader(resultData)); + } + return resultDataPointReaders; } + /** * calculation aggregate result with value filter. */ @@ -312,19 +349,7 @@ public class AggregateEngineExecutor { return aggreResultDataArrayList; } - /** - * using aggregate result data list construct QueryDataSet. - * - * @param aggreResultDataList aggregate result data list - */ - private QueryDataSet constructDataSet(List<AggreResultData> aggreResultDataList) - throws IOException { - List<TSDataType> dataTypes = new ArrayList<>(); - List<IPointReader> resultDataPointReaders = new ArrayList<>(); - for (AggreResultData resultData : aggreResultDataList) { - dataTypes.add(resultData.getDataType()); - resultDataPointReaders.add(new AggreResultDataPointReader(resultData)); - } - return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, resultDataPointReaders); + public List<TSDataType> getDataTypes() { + return dataTypes; } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java index 03c600d..32fcbf7 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java @@ -90,14 +90,14 @@ public class EngineQueryRouter implements IEngineQueryRouter{ AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor( selectedSeries, aggres, optimizedExpression); if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) { - return engineExecutor.executeWithOutTimeGenerator(context); + return engineExecutor.executeWithoutTimeGenerator(context); } else { return engineExecutor.executeWithTimeGenerator(context); } } else { AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor( selectedSeries, aggres, null); - return engineExecutor.executeWithOutTimeGenerator(context); + return engineExecutor.executeWithoutTimeGenerator(context); } }
