This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 2bb0a0419b8da24ca5d923bbb486dd8e37ab67ad Author: lta <[email protected]> AuthorDate: Tue May 21 15:47:54 2019 +0800 add group by features --- .../ClusterGroupByDataSetWithOnlyTimeFilter.java | 154 +++++++++++++++++++++ .../ClusterGroupByDataSetWithTimeGenerator.java | 91 ++++++++++++ .../executor/ClusterAggregateEngineExecutor.java | 6 +- .../cluster/query/executor/ClusterQueryRouter.java | 62 ++++++++- .../querynode/ClusterLocalSingleQueryManager.java | 59 +++++++- .../ClusterFilterSeriesBatchReaderEntity.java | 2 +- ...lusterGroupBySelectSeriesBatchReaderEntity.java | 79 +++++++++++ .../querynode/ClusterSelectSeriesBatchReader.java | 4 +- .../ClusterSelectSeriesBatchReaderEntity.java | 7 +- ...y.java => IClusterSeriesBatchReaderEntity.java} | 2 +- .../query/utils/QueryPlanPartitionUtils.java | 38 ++++- .../iotdb/db/qp/executor/QueryProcessExecutor.java | 5 +- .../groupby/GroupByWithOnlyTimeFilterDataSet.java | 5 +- .../groupby/GroupByWithValueFilterDataSet.java | 4 +- ...neQueryRouter.java => AbstractQueryRouter.java} | 52 ++++++- .../iotdb/db/query/executor/EngineQueryRouter.java | 42 +----- 16 files changed, 536 insertions(+), 76 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java new file mode 100644 index 0000000..98460e4 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.query.dataset; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; +import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity; +import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader; +import org.apache.iotdb.cluster.utils.QPExecutorUtils; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.aggregation.AggreResultData; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet; +import org.apache.iotdb.db.query.factory.SeriesReaderFactory; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader; +import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.expression.IExpression; +import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; +import org.apache.iotdb.tsfile.utils.Pair; + +/** + * Handle group by query with only time filter + */ +public class ClusterGroupByDataSetWithOnlyTimeFilter extends GroupByWithOnlyTimeFilterDataSet { + + private ClusterRpcSingleQueryManager queryManager; + private List<IPointReader> readersOfSelectedSeries; + + /** + * constructor. + */ + public ClusterGroupByDataSetWithOnlyTimeFilter(long jobId, + List<Path> paths, long unit, long origin, + List<Pair<Long, Long>> mergedIntervals, ClusterRpcSingleQueryManager queryManager) { + super(jobId, paths, unit, origin, mergedIntervals); + this.queryManager =queryManager; + this.readersOfSelectedSeries = new ArrayList<>(); + } + + + /** + * init reader and aggregate function. + */ + public void initGroupBy(QueryContext context, List<String> aggres, IExpression expression) + throws FileNodeManagerException, PathErrorException, ProcessorException, IOException { + initAggreFuction(aggres); + + /** add query token for query series which can handle locally **/ + List<Path> localQuerySeries = new ArrayList<>(selectedSeries); + Set<Path> remoteQuerySeries = new HashSet<>(); + queryManager.getSelectSeriesGroupEntityMap().values().forEach( + selectSeriesGroupEntity -> remoteQuerySeries + .addAll(selectSeriesGroupEntity.getSelectPaths())); + localQuerySeries.removeAll(remoteQuerySeries); + QueryResourceManager.getInstance() + .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries); + if (expression != null) { + timeFilter = ((GlobalTimeExpression) expression).getFilter(); + } + + Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = queryManager + .getSelectSeriesGroupEntityMap(); + //Mark filter series reader index group by group id + Map<String, Integer> selectSeriesReaderIndex = new HashMap<>(); + for (int i = 0; i < selectedSeries.size(); i++) { + Path path = selectedSeries.get(i); + String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice()); + if (selectSeriesGroupEntityMap.containsKey(groupId)) { + int index = selectSeriesReaderIndex.getOrDefault(groupId, 0); + ClusterSelectSeriesReader reader = selectSeriesGroupEntityMap.get(groupId) + .getSelectSeriesReaders().get(index); + readersOfSelectedSeries.add(reader); + selectSeriesReaderIndex.put(groupId, index + 1); + } else { + readersOfSelectedSeries.add(null); + QueryDataSource queryDataSource = QueryResourceManager.getInstance() + .getQueryDataSource(selectedSeries.get(i), context); + + // sequence reader for sealed tsfile, unsealed tsfile, memory + SequenceDataReader sequenceReader = new SequenceDataReader( + queryDataSource.getSeqDataSource(), + timeFilter, context, false); + + // unseq reader for all chunk groups in unSeqFile, memory + PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance() + .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter); + + sequenceReaderList.add(sequenceReader); + unSequenceReaderList.add(unSeqMergeReader); + } + } + } + + @Override + public RowRecord next() throws IOException { + if (!hasCachedTimeInterval) { + throw new IOException("need to call hasNext() before calling next() " + + "in GroupByWithOnlyTimeFilterDataSet."); + } + hasCachedTimeInterval = false; + RowRecord record = new RowRecord(startTime); + for (int i = 0; i < functions.size(); i++) { + IPointReader reader = readersOfSelectedSeries.get(i); + if(reader != null){ + TimeValuePair timeValuePair = reader.next(); + record.addField(getField(timeValuePair.getValue().getValue(), dataTypes.get(i))); + }else { + AggreResultData res; + try { + res = nextSeries(i); + } catch (ProcessorException e) { + throw new IOException(e); + } + if (res == null) { + record.addField(new Field(null)); + } else { + record.addField(getField(res)); + } + } + } + return record; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java new file mode 100644 index 0000000..00f2d88 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.query.dataset; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory; +import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; +import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity; +import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator; +import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.expression.IExpression; +import org.apache.iotdb.tsfile.utils.Pair; + +public class ClusterGroupByDataSetWithTimeGenerator extends GroupByWithValueFilterDataSet { + + private ClusterRpcSingleQueryManager queryManager; + + private List<TSDataType> selectSeriesDataTypes; + + /** + * constructor. + */ + public ClusterGroupByDataSetWithTimeGenerator(long jobId, + List<Path> paths, long unit, long origin, + List<Pair<Long, Long>> mergedIntervals, ClusterRpcSingleQueryManager queryManager) { + super(jobId, paths, unit, origin, mergedIntervals); + this.queryManager = queryManager; + selectSeriesDataTypes = new ArrayList<>(); + } + + /** + * init reader and aggregate function. + */ + @Override + public void initGroupBy(QueryContext context, List<String> aggres, IExpression expression) + throws FileNodeManagerException, PathErrorException, ProcessorException, IOException { + initAggreFuction(aggres); + + /** add query token for filter series which can handle locally **/ + Set<String> deviceIdSet = new HashSet<>(); + for (FilterSeriesGroupEntity filterSeriesGroupEntity : queryManager + .getFilterSeriesGroupEntityMap().values()) { + List<Path> remoteFilterSeries = filterSeriesGroupEntity.getFilterPaths(); + remoteFilterSeries.forEach(seriesPath -> deviceIdSet.add(seriesPath.getDevice())); + } + QueryResourceManager.getInstance() + .beginQueryOfGivenExpression(context.getJobId(), expression, deviceIdSet); + + /** add query token for query series which can handle locally **/ + List<Path> localQuerySeries = new ArrayList<>(selectedSeries); + Set<Path> remoteQuerySeries = new HashSet<>(); + queryManager.getSelectSeriesGroupEntityMap().values().forEach( + selectSeriesGroupEntity -> remoteQuerySeries + .addAll(selectSeriesGroupEntity.getSelectPaths())); + localQuerySeries.removeAll(remoteQuerySeries); + QueryResourceManager.getInstance() + .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries); + + this.timestampGenerator = new ClusterTimeGenerator(expression, context, queryManager); + this.allDataReaderList = ClusterSeriesReaderFactory + .createReadersByTimestampOfSelectedPaths(selectedSeries, context, queryManager, + selectSeriesDataTypes); + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java index 2cf4e87..808eab8 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java @@ -25,8 +25,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.iotdb.cluster.config.ClusterConfig; -import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.exception.RaftConnectionException; import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; @@ -151,8 +149,8 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor { List<Path> localQuerySeries = new ArrayList<>(selectedSeries); Set<Path> remoteQuerySeries = new HashSet<>(); queryManager.getSelectSeriesGroupEntityMap().values().forEach( - selectSeriesGroupEntity -> selectSeriesGroupEntity.getSelectPaths() - .forEach(path -> remoteQuerySeries.add(path))); + selectSeriesGroupEntity -> remoteQuerySeries + .addAll(selectSeriesGroupEntity.getSelectPaths())); localQuerySeries.removeAll(remoteQuerySeries); QueryResourceManager.getInstance() .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java index 5d6a342..f9d32f5 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java @@ -23,14 +23,16 @@ import java.util.List; import java.util.Map; import org.apache.iotdb.cluster.exception.RaftConnectionException; import org.apache.iotdb.cluster.query.QueryType; +import org.apache.iotdb.cluster.query.dataset.ClusterGroupByDataSetWithOnlyTimeFilter; +import org.apache.iotdb.cluster.query.dataset.ClusterGroupByDataSetWithTimeGenerator; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.executor.AbstractQueryRouter; import org.apache.iotdb.db.query.executor.AggregateEngineExecutor; -import org.apache.iotdb.db.query.executor.IEngineQueryRouter; import org.apache.iotdb.db.query.fill.IFill; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -38,7 +40,10 @@ import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.expression.ExpressionType; import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.QueryExpression; +import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression; +import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer; +import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.utils.Pair; @@ -46,7 +51,7 @@ import org.apache.iotdb.tsfile.utils.Pair; * Query entrance class of cluster query process. All query clause will be transformed to physical * plan, physical plan will be executed by ClusterQueryRouter. */ -public class ClusterQueryRouter implements IEngineQueryRouter { +public class ClusterQueryRouter extends AbstractQueryRouter { /** * Consistency level of reading data @@ -128,7 +133,55 @@ public class ClusterQueryRouter implements IEngineQueryRouter { IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context) throws ProcessorException, QueryFilterOptimizationException, FileNodeManagerException, PathErrorException, IOException { - throw new UnsupportedOperationException(); + + long jobId = context.getJobId(); + ClusterRpcSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance() + .getSingleQuery(jobId); + + //check the legitimacy of intervals + checkIntervals(intervals); + + // merge intervals + List<Pair<Long, Long>> mergedIntervalList = mergeInterval(intervals); + + // construct groupBy intervals filter + BinaryExpression intervalFilter = null; + for (Pair<Long, Long> pair : mergedIntervalList) { + BinaryExpression pairFilter = BinaryExpression + .and(new GlobalTimeExpression(TimeFilter.gtEq(pair.left)), + new GlobalTimeExpression(TimeFilter.ltEq(pair.right))); + if (intervalFilter != null) { + intervalFilter = BinaryExpression.or(intervalFilter, pairFilter); + } else { + intervalFilter = pairFilter; + } + } + + // merge interval filter and filtering conditions after where statements + if (expression == null) { + expression = intervalFilter; + } else { + expression = BinaryExpression.and(expression, intervalFilter); + } + + IExpression optimizedExpression = ExpressionOptimizer.getInstance() + .optimize(expression, selectedSeries); + try { + if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) { + ClusterGroupByDataSetWithOnlyTimeFilter groupByEngine = new ClusterGroupByDataSetWithOnlyTimeFilter( + jobId, selectedSeries, unit, origin, mergedIntervalList, queryManager); + groupByEngine.initGroupBy(context, aggres, optimizedExpression); + return groupByEngine; + } else { + queryManager.initQueryResource(QueryType.FILTER, getReadDataConsistencyLevel()); + ClusterGroupByDataSetWithTimeGenerator groupByEngine = new ClusterGroupByDataSetWithTimeGenerator( + jobId, selectedSeries, unit, origin, mergedIntervalList, queryManager); + groupByEngine.initGroupBy(context, aggres, optimizedExpression); + return groupByEngine; + } + } catch (RaftConnectionException e) { + throw new FileNodeManagerException(e); + } } @Override @@ -139,7 +192,8 @@ public class ClusterQueryRouter implements IEngineQueryRouter { try { queryManager.initQueryResource(QueryType.NO_FILTER, getReadDataConsistencyLevel()); - ClusterFillEngineExecutor fillEngineExecutor = new ClusterFillEngineExecutor(fillPaths, queryTime, + ClusterFillEngineExecutor fillEngineExecutor = new ClusterFillEngineExecutor(fillPaths, + queryTime, fillType, queryManager); return fillEngineExecutor.execute(context); } catch (IOException | RaftConnectionException e) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java index 8799be2..25adbf5 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java @@ -31,10 +31,10 @@ import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory; import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader; import org.apache.iotdb.cluster.query.reader.querynode.ClusterFillSelectSeriesBatchReader; import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReaderEntity; +import org.apache.iotdb.cluster.query.reader.querynode.ClusterGroupBySelectSeriesBatchReaderEntity; import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader; import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp; import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderEntity; -import org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReaderEntity; import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest; import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest; import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest; @@ -54,6 +54,7 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet; import org.apache.iotdb.db.query.executor.AbstractExecutorWithoutTimeGenerator; import org.apache.iotdb.db.query.executor.AggregateEngineExecutor; import org.apache.iotdb.db.query.fill.IFill; @@ -105,9 +106,14 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM private ClusterSelectSeriesBatchReaderEntity selectReaderEntity; /** + * Select reader entity of group by query, which handle group by query with only time filter + */ + private ClusterGroupBySelectSeriesBatchReaderEntity groupBySelectReaderEntity; + + /** * Filter reader entity */ - private IClusterFilterSeriesBatchReaderEntity filterReaderEntity; + private ClusterFilterSeriesBatchReaderEntity filterReaderEntity; /** * Key is series full path, value is data type of series @@ -146,7 +152,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM selectReaderEntity = new ClusterSelectSeriesBatchReaderEntity(); QueryPlan plan = queryPlanMap.get(PathType.SELECT_PATH); if (plan instanceof GroupByPlan) { - throw new UnsupportedOperationException(); + handleGroupBySeriesReader(plan, context, response); } else if (plan instanceof AggregationPlan) { handleAggreSeriesReader(plan, context, response); } else if (plan instanceof FillQueryPlan) { @@ -206,6 +212,43 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM * * @param queryPlan fill query plan */ + private void handleGroupBySeriesReader(QueryPlan queryPlan, QueryContext context, + InitSeriesReaderResponse response) + throws FileNodeManagerException, PathErrorException, IOException, ProcessorException, QueryFilterOptimizationException { + if (queryPlan.getExpression() == null + || queryPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) { + handleGroupBySeriesReaderWithoutTimeGenerator(queryPlan, context, response); + } else { + handleSelectReaderWithTimeGenerator(queryPlan, context, response); + } + } + + + /** + * Handle aggregation series reader without value filter + * + * @param queryPlan fill query plan + */ + private void handleGroupBySeriesReaderWithoutTimeGenerator(QueryPlan queryPlan, + QueryContext context, + InitSeriesReaderResponse response) + throws FileNodeManagerException, PathErrorException, IOException, ProcessorException, QueryFilterOptimizationException { + QueryDataSet queryDataSet = queryProcessExecutor.processQuery(queryPlan, context); + List<Path> paths = queryDataSet.getPaths(); + List<TSDataType> dataTypes = queryDataSet.getDataTypes(); + for (int i = 0; i < paths.size(); i++) { + dataTypeMap.put(paths.get(i).getFullPath(), dataTypes.get(i)); + } + groupBySelectReaderEntity = new ClusterGroupBySelectSeriesBatchReaderEntity(paths, dataTypes, + (GroupByWithOnlyTimeFilterDataSet) queryDataSet); + response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes); + } + + /** + * Handle aggregation series reader + * + * @param queryPlan fill query plan + */ private void handleAggreSeriesReader(QueryPlan queryPlan, QueryContext context, InitSeriesReaderResponse response) throws FileNodeManagerException, PathErrorException, IOException, ProcessorException { @@ -304,8 +347,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM private void handleFilterSeriesReader(QueryPlan plan, QueryContext context, InitSeriesReaderRequest request, InitSeriesReaderResponse response, PathType pathType) throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException, ProcessorException, IOException, ClassNotFoundException { - QueryDataSet queryDataSet = queryProcessExecutor - .processQuery(plan, context); + QueryDataSet queryDataSet = queryProcessExecutor.processQuery(plan, context); List<Path> paths = plan.getPaths(); List<TSDataType> dataTypes = queryDataSet.getDataTypes(); for (int i = 0; i < paths.size(); i++) { @@ -353,7 +395,10 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM PathType pathType = request.getPathType(); List<BatchData> batchDataList; if (pathType == PathType.SELECT_PATH) { - batchDataList = readSelectSeriesBatchData(request.getSeriesPathIndexs()); + // check whether it's a group by query with only time filter + batchDataList = + groupBySelectReaderEntity != null ? groupBySelectReaderEntity.nextBatchList() + : readSelectSeriesBatchData(request.getSeriesPathIndexs()); } else { batchDataList = readFilterSeriesBatchData(); } @@ -435,7 +480,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM return selectReaderEntity; } - public IClusterFilterSeriesBatchReaderEntity getFilterReaderEntity() { + public ClusterFilterSeriesBatchReaderEntity getFilterReaderEntity() { return filterReaderEntity; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java index 65f8c1c..ddcb35d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java @@ -34,7 +34,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; /** * Batch reader entity for all filter paths. */ -public class ClusterFilterSeriesBatchReaderEntity implements IClusterFilterSeriesBatchReaderEntity { +public class ClusterFilterSeriesBatchReaderEntity implements IClusterSeriesBatchReaderEntity { private List<Path> allFilterPath; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java new file mode 100644 index 0000000..3b7fabe --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.query.reader.querynode; + +import static org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader.CLUSTER_CONF; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.RowRecord; + +public class ClusterGroupBySelectSeriesBatchReaderEntity implements + IClusterSeriesBatchReaderEntity { + + private List<Path> paths; + private List<TSDataType> dataTypes; + + private GroupByWithOnlyTimeFilterDataSet queryDataSet; + + public ClusterGroupBySelectSeriesBatchReaderEntity( + List<Path> paths, + List<TSDataType> dataTypes, + GroupByWithOnlyTimeFilterDataSet queryDataSet) { + this.paths = paths; + this.dataTypes = dataTypes; + this.queryDataSet = queryDataSet; + } + + @Override + public boolean hasNext() throws IOException { + return queryDataSet.hasNext(); + } + + @Override + public List<BatchData> nextBatchList() throws IOException { + List<BatchData> batchDataList = new ArrayList<>(paths.size()); + for (int i = 0; i < paths.size(); i++) { + batchDataList.add(new BatchData(dataTypes.get(i), true)); + } + int dataPointCount = 0; + while (true) { + if (!hasNext() || dataPointCount == CLUSTER_CONF.getBatchReadSize()) { + break; + } + dataPointCount++; + RowRecord rowRecord = queryDataSet.next(); + long time = rowRecord.getTimestamp(); + List<Field> fieldList = rowRecord.getFields(); + for (int j = 0; j < paths.size(); j++) { + BatchData batchData = batchDataList.get(j); + Object value = fieldList.get(j).getObjectValue(dataTypes.get(j)); + batchData.putTime(time); + batchData.putAnObject(value); + } + } + return batchDataList; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java index cfc43b8..28b6346 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java @@ -51,6 +51,9 @@ public class ClusterSelectSeriesBatchReader extends this.reader = reader; } + public ClusterSelectSeriesBatchReader() { + } + @Override public boolean hasNext() throws IOException { return reader.hasNext(); @@ -62,7 +65,6 @@ public class ClusterSelectSeriesBatchReader extends for (int i = 0; i < CLUSTER_CONF.getBatchReadSize(); i++) { if (hasNext()) { TimeValuePair pair = reader.next(); - System.out.println("reader value:" + pair); batchData.putTime(pair.getTimestamp()); batchData.putAnObject(pair.getValue().getValue()); } else { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java index f0dea38..484c423 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java @@ -25,15 +25,16 @@ import java.util.List; * Batch reader entity for all select paths. */ public class ClusterSelectSeriesBatchReaderEntity { + /** * All select paths */ - List<String> paths; + private List<String> paths; /** * All select readers */ - List<AbstractClusterSelectSeriesBatchReader> readers; + private List<AbstractClusterSelectSeriesBatchReader> readers; public ClusterSelectSeriesBatchReaderEntity() { paths = new ArrayList<>(); @@ -52,7 +53,7 @@ public class ClusterSelectSeriesBatchReaderEntity { return readers; } - public AbstractClusterSelectSeriesBatchReader getReaderByIndex(int index){ + public AbstractClusterSelectSeriesBatchReader getReaderByIndex(int index) { return readers.get(index); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSeriesBatchReaderEntity.java similarity index 95% rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSeriesBatchReaderEntity.java index a045e2a..e6e9e86 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSeriesBatchReaderEntity.java @@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData; /** * Batch reader for filter series which is used in query node. */ -public interface IClusterFilterSeriesBatchReaderEntity { +public interface IClusterSeriesBatchReaderEntity { boolean hasNext() throws IOException; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java index 3a2746f..03e4e7f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java @@ -55,10 +55,10 @@ public class QueryPlanPartitionUtils { QueryPlan queryPLan = singleQueryManager.getOriginQueryPlan(); if (queryPLan instanceof FillQueryPlan) { splitFillPlan(singleQueryManager); - } else if (queryPLan instanceof AggregationPlan) { - splitAggregationPlanBySelectPath(singleQueryManager); } else if (queryPLan instanceof GroupByPlan) { splitGroupByPlanBySelectPath(singleQueryManager); + } else if (queryPLan instanceof AggregationPlan) { + splitAggregationPlanBySelectPath(singleQueryManager); } else { splitQueryPlanBySelectPath(singleQueryManager); } @@ -136,8 +136,38 @@ public class QueryPlanPartitionUtils { * Split group by plan by select path */ private static void splitGroupByPlanBySelectPath( - ClusterRpcSingleQueryManager singleQueryManager) { - throw new UnsupportedOperationException(); + ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException { + GroupByPlan queryPlan = (GroupByPlan) singleQueryManager.getOriginQueryPlan(); + List<Path> selectPaths = queryPlan.getPaths(); + List<String> aggregations = queryPlan.getAggregations(); + Map<String, SelectSeriesGroupEntity> selectGroupEntityMap = singleQueryManager + .getSelectSeriesGroupEntityMap(); + Map<String, List<String>> selectAggregationByGroupId = new HashMap<>(); + for (int i = 0; i < selectPaths.size(); i++) { + String aggregation = aggregations.get(i); + Path path = selectPaths.get(i); + String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice()); + if (!selectGroupEntityMap.containsKey(groupId)) { + selectGroupEntityMap.put(groupId, new SelectSeriesGroupEntity(groupId)); + selectAggregationByGroupId.put(groupId, new ArrayList<>()); + } + selectGroupEntityMap.get(groupId).addSelectPaths(path); + selectAggregationByGroupId.get(groupId).add(aggregation); + } + for (Entry<String, SelectSeriesGroupEntity> entry : selectGroupEntityMap.entrySet()) { + String groupId = entry.getKey(); + SelectSeriesGroupEntity entity = entry.getValue(); + List<Path> paths = entity.getSelectPaths(); + GroupByPlan subQueryPlan = new GroupByPlan(); + subQueryPlan.setIntervals(queryPlan.getIntervals()); + subQueryPlan.setOrigin(queryPlan.getOrigin()); + subQueryPlan.setUnit(queryPlan.getUnit()); + subQueryPlan.setProposer(queryPlan.getProposer()); + subQueryPlan.setPaths(paths); + subQueryPlan.setExpression(queryPlan.getExpression()); + subQueryPlan.setAggregations(selectAggregationByGroupId.get(groupId)); + entity.setQueryPlan(subQueryPlan); + } } /** diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java index 99476f2..9bf87c2 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java @@ -27,14 +27,13 @@ import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.executor.EngineQueryRouter; -import org.apache.iotdb.db.query.executor.IEngineQueryRouter; +import org.apache.iotdb.db.query.executor.AbstractQueryRouter; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.expression.QueryExpression; @@ -43,7 +42,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; public abstract class QueryProcessExecutor implements IQueryProcessExecutor { protected ThreadLocal<Integer> fetchSize = new ThreadLocal<>(); - protected IEngineQueryRouter queryRouter = new EngineQueryRouter(); + protected AbstractQueryRouter queryRouter = new EngineQueryRouter(); @Override public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java index af74bf6..323bb82 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithOnlyTimeFilterDataSet.java @@ -51,7 +51,7 @@ public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngineDataSet { protected List<IAggregateReader> sequenceReaderList; private List<BatchData> batchDataList; private List<Boolean> hasCachedSequenceDataList; - private Filter timeFilter; + protected Filter timeFilter; /** * constructor. @@ -96,7 +96,6 @@ public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngineDataSet { sequenceReaderList.add(sequenceReader); unSequenceReaderList.add(unSeqMergeReader); } - } @Override @@ -128,7 +127,7 @@ public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngineDataSet { * * @param idx series id */ - private AggreResultData nextSeries(int idx) throws IOException, ProcessorException { + protected AggreResultData nextSeries(int idx) throws IOException, ProcessorException { IPointReader unsequenceReader = unSequenceReaderList.get(idx); IAggregateReader sequenceReader = sequenceReaderList.get(idx); AggregateFunction function = functions.get(idx); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java index 2991ff7..528b378 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java @@ -40,8 +40,8 @@ import org.apache.iotdb.tsfile.utils.Pair; public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { - private List<EngineReaderByTimeStamp> allDataReaderList; - private TimeGenerator timestampGenerator; + protected List<EngineReaderByTimeStamp> allDataReaderList; + protected TimeGenerator timestampGenerator; /** * cached timestamp for next group by partition. */ diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractQueryRouter.java similarity index 59% rename from iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java rename to iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractQueryRouter.java index 01c1aed..20f0f9d 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AbstractQueryRouter.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.query.executor; import java.io.IOException; +import java.util.LinkedList; import java.util.List; import java.util.Map; import org.apache.iotdb.db.exception.FileNodeManagerException; @@ -34,18 +35,18 @@ import org.apache.iotdb.tsfile.read.expression.QueryExpression; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.utils.Pair; -public interface IEngineQueryRouter { +public abstract class AbstractQueryRouter { /** * Execute physical plan. */ - QueryDataSet query(QueryExpression queryExpression, QueryContext context) + public abstract QueryDataSet query(QueryExpression queryExpression, QueryContext context) throws FileNodeManagerException, PathErrorException; /** * Execute aggregation query. */ - QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres, + public abstract QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres, IExpression expression, QueryContext context) throws QueryFilterOptimizationException, FileNodeManagerException, IOException, PathErrorException, ProcessorException; @@ -60,7 +61,7 @@ public interface IEngineQueryRouter { * each TimeUnit time from this point forward and backward. * @param intervals time intervals, closed interval. */ - QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres, + public abstract QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres, IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context) throws ProcessorException, QueryFilterOptimizationException, FileNodeManagerException, @@ -72,7 +73,48 @@ public interface IEngineQueryRouter { * @param queryTime timestamp * @param fillType type IFill map */ - QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType, + public abstract QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType, QueryContext context) throws FileNodeManagerException, PathErrorException, IOException; + + /** + * sort intervals by start time and merge overlapping intervals. + * + * @param intervals time interval + */ + protected List<Pair<Long, Long>> mergeInterval(List<Pair<Long, Long>> intervals) { + // sort by interval start time. + intervals.sort(((o1, o2) -> (int) (o1.left - o2.left))); + + LinkedList<Pair<Long, Long>> merged = new LinkedList<>(); + for (Pair<Long, Long> interval : intervals) { + // if the list of merged intervals is empty or + // if the current interval does not overlap with the previous, simply append it. + if (merged.isEmpty() || merged.getLast().right < interval.left) { + merged.add(interval); + } else { + // otherwise, there is overlap, so we merge the current and previous intervals. + merged.getLast().right = Math.max(merged.getLast().right, interval.right); + } + } + return merged; + } + + /** + * Check the legitimacy of intervals + */ + protected void checkIntervals(List<Pair<Long, Long>> intervals) throws ProcessorException { + for (Pair<Long, Long> pair : intervals) { + if (!(pair.left > 0 && pair.right > 0)) { + throw new ProcessorException( + String.format("Time interval<%d, %d> must be greater than 0.", pair.left, pair.right)); + } + if (pair.right < pair.left) { + throw new ProcessorException(String.format( + "Interval starting time must be greater than the interval ending time, " + + "found error interval<%d, %d>", pair.left, pair.right)); + } + } + } + } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java index 32fcbf7..c46c6f1 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.query.executor; import java.io.IOException; -import java.util.LinkedList; import java.util.List; import java.util.Map; import org.apache.iotdb.db.exception.FileNodeManagerException; @@ -47,7 +46,7 @@ import org.apache.iotdb.tsfile.utils.Pair; * Query entrance class of IoTDB query process. All query clause will be transformed to physical * plan, physical plan will be executed by EngineQueryRouter. */ -public class EngineQueryRouter implements IEngineQueryRouter{ +public class EngineQueryRouter extends AbstractQueryRouter { @Override public QueryDataSet query(QueryExpression queryExpression, QueryContext context) @@ -110,18 +109,9 @@ public class EngineQueryRouter implements IEngineQueryRouter{ long nextJobId = context.getJobId(); - // check the legitimacy of intervals - for (Pair<Long, Long> pair : intervals) { - if (!(pair.left > 0 && pair.right > 0)) { - throw new ProcessorException( - String.format("Time interval<%d, %d> must be greater than 0.", pair.left, pair.right)); - } - if (pair.right < pair.left) { - throw new ProcessorException(String.format( - "Interval starting time must be greater than the interval ending time, " - + "found error interval<%d, %d>", pair.left, pair.right)); - } - } + //check the legitimacy of intervals + checkIntervals(intervals); + // merge intervals List<Pair<Long, Long>> mergedIntervalList = mergeInterval(intervals); @@ -173,28 +163,4 @@ public class EngineQueryRouter implements IEngineQueryRouter{ return fillEngineExecutor.execute(context); } - /** - * sort intervals by start time and merge overlapping intervals. - * - * @param intervals time interval - */ - private List<Pair<Long, Long>> mergeInterval(List<Pair<Long, Long>> intervals) { - // sort by interval start time. - intervals.sort(((o1, o2) -> (int) (o1.left - o2.left))); - - LinkedList<Pair<Long, Long>> merged = new LinkedList<>(); - for (Pair<Long, Long> interval : intervals) { - // if the list of merged intervals is empty or - // if the current interval does not overlap with the previous, simply append it. - if (merged.isEmpty() || merged.getLast().right < interval.left) { - merged.add(interval); - } else { - // otherwise, there is overlap, so we merge the current and previous intervals. - merged.getLast().right = Math.max(merged.getLast().right, interval.right); - } - } - return merged; - } - - }
