This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch aggrVector2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0cc1700716e4693090ddf652c7aef3e8a5c6c71d Author: Alima777 <[email protected]> AuthorDate: Thu Sep 16 13:15:09 2021 +0800 move querycontext to class parameter --- .../iotdb/cluster/query/ClusterQueryRouter.java | 5 +- .../query/aggregate/ClusterAggregateExecutor.java | 30 ++++--- .../query/ClusterAggregateExecutorTest.java | 8 +- .../db/query/executor/AggregationExecutor.java | 96 +++++++++++----------- .../iotdb/db/query/executor/QueryRouter.java | 11 +-- 5 files changed, 79 insertions(+), 71 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java index e3be92c..eb45353 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java @@ -78,8 +78,9 @@ public class ClusterQueryRouter extends QueryRouter { } @Override - protected AggregationExecutor getAggregationExecutor(AggregationPlan aggregationPlan) { - return new ClusterAggregateExecutor(aggregationPlan, metaGroupMember); + protected AggregationExecutor getAggregationExecutor( + QueryContext context, AggregationPlan aggregationPlan) { + return new ClusterAggregateExecutor(context, aggregationPlan, metaGroupMember); } @Override diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java index a007ee9..af12e5a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java @@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Set; public class ClusterAggregateExecutor extends AggregationExecutor { @@ -51,8 +50,9 @@ public class ClusterAggregateExecutor extends AggregationExecutor { * * @param aggregationPlan */ - public ClusterAggregateExecutor(AggregationPlan aggregationPlan, MetaGroupMember metaMember) { - super(aggregationPlan); + public ClusterAggregateExecutor( + QueryContext context, AggregationPlan aggregationPlan, MetaGroupMember metaMember) { + super(context, aggregationPlan); this.metaMember = metaMember; this.readerFactory = new ClusterReaderFactory(metaMember); this.aggregator = new ClusterAggregator(metaMember); @@ -60,24 +60,28 @@ public class ClusterAggregateExecutor extends AggregationExecutor { @Override protected void aggregateOneSeries( - Map.Entry<PartialPath, List<Integer>> pathToAggrIndexes, - AggregateResult[] aggregateResultList, - Set<String> measurements, - Filter timeFilter, - QueryContext context) + PartialPath seriesPath, + List<Integer> indexes, + Set<String> allMeasurementsInDevice, + Filter timeFilter) throws StorageEngineException { - PartialPath seriesPath = pathToAggrIndexes.getKey(); - TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0)); + TSDataType tsDataType = dataTypes.get(indexes.get(0)); List<String> aggregationNames = new ArrayList<>(); - for (int i : pathToAggrIndexes.getValue()) { + for (int i : indexes) { aggregationNames.add(aggregations.get(i)); } List<AggregateResult> aggregateResult = aggregator.getAggregateResult( - seriesPath, measurements, aggregationNames, tsDataType, timeFilter, context, ascending); + seriesPath, + allMeasurementsInDevice, + aggregationNames, + tsDataType, + timeFilter, + context, + ascending); int rstIndex = 0; - for (int i : pathToAggrIndexes.getValue()) { + for (int i : indexes) { aggregateResultList[i] = aggregateResult.get(rstIndex++); } } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java index 7d18a00..ffa461e 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java @@ -87,8 +87,8 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest { QueryContext context = new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true)); try { - executor = new ClusterAggregateExecutor(plan, testMetaMember); - QueryDataSet queryDataSet = executor.executeWithoutValueFilter(context, plan); + executor = new ClusterAggregateExecutor(context, plan, testMetaMember); + QueryDataSet queryDataSet = executor.executeWithoutValueFilter(plan); assertTrue(queryDataSet.hasNext()); RowRecord record = queryDataSet.next(); List<Field> fields = record.getFields(); @@ -144,8 +144,8 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest { QueryContext context = new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true)); try { - executor = new ClusterAggregateExecutor(plan, testMetaMember); - QueryDataSet queryDataSet = executor.executeWithValueFilter(context, plan); + executor = new ClusterAggregateExecutor(context, plan, testMetaMember); + QueryDataSet queryDataSet = executor.executeWithValueFilter(plan); assertTrue(queryDataSet.hasNext()); RowRecord record = queryDataSet.next(); List<Field> fields = record.getFields(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index bc970ff..871915a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -73,26 +73,25 @@ public class AggregationExecutor { protected List<String> aggregations; protected IExpression expression; protected boolean ascending; + protected QueryContext context; + protected AggregateResult[] aggregateResultList; /** aggregation batch calculation size. */ private int aggregateFetchSize; - protected AggregationExecutor(AggregationPlan aggregationPlan) { + protected AggregationExecutor(QueryContext context, AggregationPlan aggregationPlan) { this.selectedSeries = aggregationPlan.getDeduplicatedPaths(); this.dataTypes = aggregationPlan.getDeduplicatedDataTypes(); this.aggregations = aggregationPlan.getDeduplicatedAggregations(); this.expression = aggregationPlan.getExpression(); this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize(); this.ascending = aggregationPlan.isAscending(); + this.context = context; + this.aggregateResultList = new AggregateResult[selectedSeries.size()]; } - /** - * execute aggregate function with only time filter or no filter. - * - * @param context query context - */ - public QueryDataSet executeWithoutValueFilter( - QueryContext context, AggregationPlan aggregationPlan) + /** execute aggregate function with only time filter or no filter. */ + public QueryDataSet executeWithoutValueFilter(AggregationPlan aggregationPlan) throws StorageEngineException, IOException, QueryProcessException { Filter timeFilter = null; @@ -103,22 +102,30 @@ public class AggregationExecutor { // TODO use multi-thread Map<PartialPath, List<Integer>> pathToAggrIndexesMap = groupAggregationsBySeries(selectedSeries); - // Attention: this method will REMOVE vector path from pathToAggrIndexesMap - Map<PartialPath, Map<String, List<Integer>>> vectorPathIndexesMap = - groupVectorSeries(pathToAggrIndexesMap); - AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()]; - // TODO-Cluster: group the paths by storage group to reduce communications List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet())); + + // Attention: this method will REMOVE vector path from pathToAggrIndexesMap + Map<PartialPath, Map<String, List<Integer>>> vectorPathIndexesMap = + groupVectorSeries(pathToAggrIndexesMap); try { for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) { + PartialPath seriesPath = entry.getKey(); aggregateOneSeries( - entry, - aggregateResultList, - aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()), - timeFilter, - context); + seriesPath, + entry.getValue(), + aggregationPlan.getAllMeasurementsInDevice(seriesPath.getDevice()), + timeFilter); + } + for (Map.Entry<PartialPath, Map<String, List<Integer>>> entry : + vectorPathIndexesMap.entrySet()) { + VectorPartialPath vectorSeries = (VectorPartialPath) entry.getKey(); + aggregateOneVectorSeries( + vectorSeries, + entry.getValue(), + aggregationPlan.getAllMeasurementsInDevice(vectorSeries.getDevice()), + timeFilter); } } finally { StorageEngine.getInstance().mergeUnLock(list); @@ -130,25 +137,21 @@ public class AggregationExecutor { /** * get aggregation result for one series * - * @param pathToAggrIndexes entry of path to aggregation indexes map * @param timeFilter time filter - * @param context query context */ protected void aggregateOneSeries( - Map.Entry<PartialPath, List<Integer>> pathToAggrIndexes, - AggregateResult[] aggregateResultList, - Set<String> measurements, - Filter timeFilter, - QueryContext context) + PartialPath seriesPath, + List<Integer> indexes, + Set<String> allMeasurementsInDevice, + Filter timeFilter) throws IOException, QueryProcessException, StorageEngineException { List<AggregateResult> ascAggregateResultList = new ArrayList<>(); List<AggregateResult> descAggregateResultList = new ArrayList<>(); boolean[] isAsc = new boolean[aggregateResultList.length]; - PartialPath seriesPath = pathToAggrIndexes.getKey(); - TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0)); + TSDataType tsDataType = dataTypes.get(indexes.get(0)); - for (int i : pathToAggrIndexes.getValue()) { + for (int i : indexes) { // construct AggregateResult AggregateResult aggregateResult = AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType); @@ -161,7 +164,7 @@ public class AggregationExecutor { } aggregateOneSeries( seriesPath, - measurements, + allMeasurementsInDevice, context, timeFilter, tsDataType, @@ -171,7 +174,7 @@ public class AggregationExecutor { int ascIndex = 0; int descIndex = 0; - for (int i : pathToAggrIndexes.getValue()) { + for (int i : indexes) { aggregateResultList[i] = isAsc[i] ? ascAggregateResultList.get(ascIndex++) @@ -179,6 +182,13 @@ public class AggregationExecutor { } } + protected void aggregateOneVectorSeries( + PartialPath seriesPath, + Map<String, List<Integer>> subIndexes, + Set<String> allMeasurementsInDevice, + Filter timeFilter) + throws IOException, QueryProcessException, StorageEngineException {} + @SuppressWarnings("squid:S107") public static void aggregateOneSeries( PartialPath seriesPath, @@ -338,12 +348,8 @@ public class AggregationExecutor { return remainingToCalculate; } - /** - * execute aggregate function with value filter. - * - * @param context query context. - */ - public QueryDataSet executeWithValueFilter(QueryContext context, AggregationPlan queryPlan) + /** execute aggregate function with value filter. */ + public QueryDataSet executeWithValueFilter(AggregationPlan queryPlan) throws StorageEngineException, IOException, QueryProcessException { optimizeLastElementFunc(queryPlan); @@ -367,15 +373,13 @@ public class AggregationExecutor { StorageEngine.getInstance().mergeUnLock(list); } - List<AggregateResult> aggregateResults = new ArrayList<>(); for (int i = 0; i < selectedSeries.size(); i++) { - AggregateResult result = + aggregateResultList[i] = AggregateResultFactory.getAggrResultByName( aggregations.get(i), dataTypes.get(i), ascending); - aggregateResults.add(result); } - aggregateWithValueFilter(aggregateResults, timestampGenerator, readerToAggrIndexesMap); - return constructDataSet(aggregateResults, queryPlan); + aggregateWithValueFilter(timestampGenerator, readerToAggrIndexesMap); + return constructDataSet(Arrays.asList(aggregateResultList), queryPlan); } private void optimizeLastElementFunc(QueryPlan queryPlan) { @@ -413,7 +417,6 @@ public class AggregationExecutor { /** calculate aggregation result with value filter. */ private void aggregateWithValueFilter( - List<AggregateResult> aggregateResults, TimeGenerator timestampGenerator, Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap) throws IOException { @@ -440,17 +443,16 @@ public class AggregationExecutor { if (cached.get(pathId)) { Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId)); for (Integer i : entry.getValue()) { - aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values); + aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values); } } else { if (entry.getValue().size() == 1) { - aggregateResults - .get(entry.getValue().get(0)) - .updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey()); + aggregateResultList[entry.getValue().get(0)].updateResultUsingTimestamps( + timeArray, timeArrayLength, entry.getKey()); } else { Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength); for (Integer i : entry.getValue()) { - aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values); + aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java index d2ae91f..a134d8e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java @@ -139,22 +139,23 @@ public class QueryRouter implements IQueryRouter { aggregationPlan.setExpression(optimizedExpression); - AggregationExecutor engineExecutor = getAggregationExecutor(aggregationPlan); + AggregationExecutor engineExecutor = getAggregationExecutor(context, aggregationPlan); QueryDataSet dataSet = null; if (optimizedExpression != null && optimizedExpression.getType() != ExpressionType.GLOBAL_TIME) { - dataSet = engineExecutor.executeWithValueFilter(context, aggregationPlan); + dataSet = engineExecutor.executeWithValueFilter(aggregationPlan); } else { - dataSet = engineExecutor.executeWithoutValueFilter(context, aggregationPlan); + dataSet = engineExecutor.executeWithoutValueFilter(aggregationPlan); } return dataSet; } - protected AggregationExecutor getAggregationExecutor(AggregationPlan aggregationPlan) { - return new AggregationExecutor(aggregationPlan); + protected AggregationExecutor getAggregationExecutor( + QueryContext context, AggregationPlan aggregationPlan) { + return new AggregationExecutor(context, aggregationPlan); } @Override
