This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 3ee37c3afc234b347c82b9ac984fb487988f8aeb Author: lta <[email protected]> AuthorDate: Mon May 20 15:34:30 2019 +0800 fix some error bugs: add select series group entity, add query for all nodes features --- .../executor/ClusterAggregateEngineExecutor.java | 48 ++++---- .../executor/ClusterExecutorWithTimeGenerator.java | 35 ++---- .../ClusterExecutorWithoutTimeGenerator.java | 21 +++- .../query/executor/ClusterFillEngineExecutor.java | 15 ++- .../query/factory/ClusterSeriesReaderFactory.java | 28 +++-- .../ClusterRpcSingleQueryManager.java | 137 +++++++++------------ ...oupEntity.java => FilterSeriesGroupEntity.java} | 4 +- .../IClusterRpcSingleQueryManager.java | 7 -- ...oupEntity.java => SelectSeriesGroupEntity.java} | 55 +++------ .../AbstractClusterPointReader.java | 2 + .../coordinatornode/ClusterFilterSeriesReader.java | 17 +-- .../coordinatornode/ClusterSelectSeriesReader.java | 25 +--- .../timegenerator/ClusterNodeConstructor.java | 4 +- .../query/utils/ClusterTimeValuePairUtils.java | 18 +++ .../iotdb/cluster/query/utils/ExpressionUtils.java | 12 +- .../query/utils/QueryPlanPartitionUtils.java | 65 +++++----- .../query/manager/ClusterRpcManagerTest.java | 46 ++----- .../query/utils/QueryPlanPartitionUtilsTest.java | 56 +++++---- 18 files changed, 268 insertions(+), 327 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java index 51113c9..b34afa1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.cluster.query.executor; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -29,9 +30,11 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.exception.RaftConnectionException; import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; -import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity; +import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity; +import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity; import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader; import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator; +import org.apache.iotdb.cluster.utils.QPExecutorUtils; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; @@ -79,18 +82,25 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor { public QueryDataSet executeWithoutTimeGenerator(QueryContext context) throws FileNodeManagerException, IOException, PathErrorException, ProcessorException { Filter timeFilter = expression != null ? ((GlobalTimeExpression) expression).getFilter() : null; - Map<Path, ClusterSelectSeriesReader> selectPathReaders = queryManager.getSelectSeriesReaders(); + Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = queryManager + .getSelectSeriesGroupEntityMap(); List<Path> paths = new ArrayList<>(); List<IPointReader> readers = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); + //Mark filter series reader index group by group id + Map<String, Integer> selectSeriesReaderIndex = new HashMap<>(); for (int i = 0; i < selectedSeries.size(); i++) { Path path = selectedSeries.get(i); + String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice()); - if (selectPathReaders.containsKey(path)) { - ClusterSelectSeriesReader reader = selectPathReaders.get(path); + if (selectSeriesGroupEntityMap.containsKey(groupId)) { + int index = selectSeriesReaderIndex.getOrDefault(groupId, 0); + ClusterSelectSeriesReader reader = selectSeriesGroupEntityMap.get(groupId) + .getSelectSeriesReaders().get(index); readers.add(reader); dataTypes.add(reader.getDataType()); + selectSeriesReaderIndex.put(groupId, index + 1); } else { paths.add(path); // construct AggregateFunction @@ -140,15 +150,19 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor { /** add query token for query series which can handle locally **/ List<Path> localQuerySeries = new ArrayList<>(selectedSeries); - Set<Path> remoteQuerySeries = queryManager.getSelectSeriesReaders().keySet(); + Set<Path> remoteQuerySeries = new HashSet<>(); + queryManager.getSelectSeriesGroupEntityMap().values().forEach( + selectSeriesGroupEntity -> selectSeriesGroupEntity.getSelectPaths() + .forEach(path -> remoteQuerySeries.add(path))); localQuerySeries.removeAll(remoteQuerySeries); QueryResourceManager.getInstance() .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries); /** add query token for filter series which can handle locally **/ Set<String> deviceIdSet = new HashSet<>(); - for (FilterGroupEntity filterGroupEntity : queryManager.getFilterGroupEntityMap().values()) { - List<Path> remoteFilterSeries = filterGroupEntity.getFilterPaths(); + for (FilterSeriesGroupEntity filterSeriesGroupEntity : queryManager + .getFilterSeriesGroupEntityMap().values()) { + List<Path> remoteFilterSeries = filterSeriesGroupEntity.getFilterPaths(); remoteFilterSeries.forEach(seriesPath -> deviceIdSet.add(seriesPath.getDevice())); } QueryResourceManager.getInstance() @@ -156,32 +170,18 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor { ClusterTimeGenerator timestampGenerator; List<EngineReaderByTimeStamp> readersOfSelectedSeries; + // origin data type of select paths + List<TSDataType> originDataTypes = new ArrayList<>(); try { timestampGenerator = new ClusterTimeGenerator(expression, context, queryManager); readersOfSelectedSeries = ClusterSeriesReaderFactory .createReadersByTimestampOfSelectedPaths(selectedSeries, context, - queryManager); + queryManager, originDataTypes); } catch (IOException ex) { throw new FileNodeManagerException(ex); } - /** Get data type of select paths **/ - List<TSDataType> originDataTypes = new ArrayList<>(); - Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = queryManager - .getSelectSeriesReaders(); - for (Path path : selectedSeries) { - try { - if (selectSeriesReaders.containsKey(path)) { - originDataTypes.add(selectSeriesReaders.get(path).getDataType()); - } else { - originDataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath())); - } - } catch (PathErrorException e) { - throw new FileNodeManagerException(e); - } - } - List<AggregateFunction> aggregateFunctions = new ArrayList<>(); for (int i = 0; i < selectedSeries.size(); i++) { TSDataType type = originDataTypes.get(i); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java index fed8c0d..fe2511a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java @@ -27,7 +27,7 @@ import java.util.Set; import org.apache.iotdb.cluster.query.dataset.ClusterDataSetWithTimeGenerator; import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; -import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity; +import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity; import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader; import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator; import org.apache.iotdb.db.exception.FileNodeManagerException; @@ -71,15 +71,19 @@ public class ClusterExecutorWithTimeGenerator { /** add query token for query series which can handle locally **/ List<Path> localQuerySeries = new ArrayList<>(queryExpression.getSelectedSeries()); - Set<Path> remoteQuerySeries = queryManager.getSelectSeriesReaders().keySet(); + Set<Path> remoteQuerySeries = new HashSet<>(); + queryManager.getSelectSeriesGroupEntityMap().values().forEach( + selectSeriesGroupEntity -> selectSeriesGroupEntity.getSelectPaths() + .forEach(path -> remoteQuerySeries.add(path))); localQuerySeries.removeAll(remoteQuerySeries); QueryResourceManager.getInstance() .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries); /** add query token for filter series which can handle locally **/ Set<String> deviceIdSet = new HashSet<>(); - for (FilterGroupEntity filterGroupEntity : queryManager.getFilterGroupEntityMap().values()) { - List<Path> remoteFilterSeries = filterGroupEntity.getFilterPaths(); + for (FilterSeriesGroupEntity filterSeriesGroupEntity : queryManager + .getFilterSeriesGroupEntityMap().values()) { + List<Path> remoteFilterSeries = filterSeriesGroupEntity.getFilterPaths(); remoteFilterSeries.forEach(seriesPath -> deviceIdSet.add(seriesPath.getDevice())); } QueryResourceManager.getInstance() @@ -88,33 +92,18 @@ public class ClusterExecutorWithTimeGenerator { ClusterTimeGenerator timestampGenerator; List<EngineReaderByTimeStamp> readersOfSelectedSeries; + /** Get data type of select paths **/ + List<TSDataType> dataTypes = new ArrayList<>(); try { timestampGenerator = new ClusterTimeGenerator(queryExpression.getExpression(), context, queryManager); readersOfSelectedSeries = ClusterSeriesReaderFactory .createReadersByTimestampOfSelectedPaths(queryExpression.getSelectedSeries(), context, - queryManager); - } catch (IOException ex) { + queryManager, dataTypes); + } catch (IOException | PathErrorException ex) { throw new FileNodeManagerException(ex); } - /** Get data type of select paths **/ - List<TSDataType> dataTypes = new ArrayList<>(); - Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = queryManager - .getSelectSeriesReaders(); - for (Path path : queryExpression.getSelectedSeries()) { - try { - if (selectSeriesReaders.containsKey(path)) { - dataTypes.add(selectSeriesReaders.get(path).getDataType()); - } else { - dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath())); - } - } catch (PathErrorException e) { - throw new FileNodeManagerException(e); - } - - } - EngineReaderByTimeStamp[] readersOfSelectedSeriesArray = new EngineReaderByTimeStamp[readersOfSelectedSeries .size()]; int index = 0; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithoutTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithoutTimeGenerator.java index 8f42c9f..95e5f1a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithoutTimeGenerator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithoutTimeGenerator.java @@ -20,11 +20,15 @@ package org.apache.iotdb.cluster.query.executor; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; +import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity; import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader; +import org.apache.iotdb.cluster.utils.QPExecutorUtils; import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator; @@ -62,7 +66,7 @@ public class ClusterExecutorWithoutTimeGenerator extends AbstractExecutorWithout * Execute query without filter or with only global time filter. */ public QueryDataSet execute(QueryContext context) - throws FileNodeManagerException { + throws FileNodeManagerException, PathErrorException { Filter timeFilter = null; if (queryExpression.getExpression() != null) { @@ -72,15 +76,22 @@ public class ClusterExecutorWithoutTimeGenerator extends AbstractExecutorWithout List<IPointReader> readersOfSelectedSeries = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); - Map<Path, ClusterSelectSeriesReader> selectPathReaders = queryManager.getSelectSeriesReaders(); + Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = queryManager + .getSelectSeriesGroupEntityMap(); List<Path> paths = new ArrayList<>(); + //Mark filter series reader index group by group id + Map<String, Integer> selectSeriesReaderIndex = new HashMap<>(); for (Path path : queryExpression.getSelectedSeries()) { - if (selectPathReaders.containsKey(path)) { - ClusterSelectSeriesReader reader = selectPathReaders.get(path); + String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice()); + + if (selectSeriesGroupEntityMap.containsKey(groupId)) { + int index = selectSeriesReaderIndex.getOrDefault(groupId, 0); + ClusterSelectSeriesReader reader = selectSeriesGroupEntityMap.get(groupId) + .getSelectSeriesReaders().get(index); readersOfSelectedSeries.add(reader); dataTypes.add(reader.getDataType()); - + selectSeriesReaderIndex.put(groupId, index + 1); } else { IPointReader reader = createSeriesReader(context, path, dataTypes, timeFilter); readersOfSelectedSeries.add(reader); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java index 771637e..608a479 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java @@ -20,10 +20,13 @@ package org.apache.iotdb.cluster.query.executor; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; +import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity; import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader; +import org.apache.iotdb.cluster.utils.QPExecutorUtils; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; @@ -58,16 +61,22 @@ public class ClusterFillEngineExecutor implements IFillEngineExecutor { @Override public QueryDataSet execute(QueryContext context) throws FileNodeManagerException, PathErrorException, IOException { - Map<Path, ClusterSelectSeriesReader> selectPathReaders = queryManager.getSelectSeriesReaders(); List<Path> paths = new ArrayList<>(); List<IFill> fillList = new ArrayList<>(); List<TSDataType> dataTypeList = new ArrayList<>(); List<IPointReader> readers = new ArrayList<>(); + Map<String, SelectSeriesGroupEntity> selectSeriesEntityMap = queryManager.getSelectSeriesGroupEntityMap(); + //Mark filter series reader index group by group id + Map<String, Integer> selectSeriesReaderIndex = new HashMap<>(); for (Path path : selectedSeries) { - if (selectPathReaders.containsKey(path)) { - ClusterSelectSeriesReader reader = selectPathReaders.get(path); + String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice()); + + if (selectSeriesEntityMap.containsKey(groupId)) { + int index = selectSeriesReaderIndex.getOrDefault(groupId, 0); + ClusterSelectSeriesReader reader = selectSeriesEntityMap.get(groupId).getSelectSeriesReaders().get(index); readers.add(reader); dataTypeList.add(reader.getDataType()); + selectSeriesReaderIndex.put(groupId, index + 1); } else { QueryDataSource queryDataSource = QueryResourceManager.getInstance() .getQueryDataSource(path, context); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java index d65ed58..a9ee032 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java @@ -20,18 +20,24 @@ package org.apache.iotdb.cluster.query.factory; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; +import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity; import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader; +import org.apache.iotdb.cluster.utils.QPExecutorUtils; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp; import org.apache.iotdb.db.query.reader.sequence.SequenceDataReaderByTimestamp; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; /** @@ -43,27 +49,35 @@ public class ClusterSeriesReaderFactory { } /** - * Construct ReaderByTimestamp , include sequential data and unsequential data. + * Construct ReaderByTimestamp , include sequential data and unsequential data. And get all series dataType. * * @param paths selected series path * @param context query context * @return the list of EngineReaderByTimeStamp */ public static List<EngineReaderByTimeStamp> createReadersByTimestampOfSelectedPaths( - List<Path> paths, QueryContext context, ClusterRpcSingleQueryManager queryManager) - throws IOException, FileNodeManagerException { + List<Path> paths, QueryContext context, ClusterRpcSingleQueryManager queryManager, List<TSDataType> dataTypes) + throws IOException, FileNodeManagerException, PathErrorException { - Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = queryManager.getSelectSeriesReaders(); + Map<String, SelectSeriesGroupEntity> selectSeriesEntityMap = queryManager + .getSelectSeriesGroupEntityMap(); List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>(); + //Mark filter series reader index group by group id + Map<String, Integer> selectSeriesReaderIndex = new HashMap<>(); for (Path path : paths) { - - if (selectSeriesReaders.containsKey(path)) { - readersOfSelectedSeries.add(selectSeriesReaders.get(path)); + String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice()); + if (selectSeriesEntityMap.containsKey(groupId)) { + int index = selectSeriesReaderIndex.getOrDefault(groupId, 0); + ClusterSelectSeriesReader reader = selectSeriesEntityMap.get(groupId).getSelectSeriesReaders().get(index); + readersOfSelectedSeries.add(reader); + dataTypes.add(reader.getDataType()); + selectSeriesReaderIndex.put(groupId, index + 1); } else { /** can handle series query locally **/ EngineReaderByTimeStamp readerByTimeStamp = createReaderByTimeStamp(path, context); readersOfSelectedSeries.add(readerByTimeStamp); + dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath())); } } return readersOfSelectedSeries; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java index af4db31..905ce1b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java @@ -86,27 +86,15 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag // select path resource /** - * Query plans of select paths which are divided from queryPlan group by group id, it contains all - * group id ,including local data group if it involves. + * Select series group entity group by data group, key is group id(only contain remote group id) */ - private Map<String, QueryPlan> selectPathPlans = new HashMap<>(); - - /** - * Key is group id (only contains remote group id), value is all select series in group id. - */ - private Map<String, List<Path>> selectSeriesByGroupId = new HashMap<>(); - - /** - * Series reader of select paths (only contains remote series), key is series path , value is - * reader - */ - private Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = new HashMap<>(); + private Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = new HashMap<>(); // filter path resource /** - * Filter group entity group by data group, key is group id(only contain remote group id) + * Filter series group entity group by data group, key is group id(only contain remote group id) */ - private Map<String, FilterGroupEntity> filterGroupEntityMap = new HashMap<>(); + private Map<String, FilterSeriesGroupEntity> filterSeriesGroupEntityMap = new HashMap<>(); private static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig(); @@ -140,17 +128,18 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag private void initSeriesReader(int readDataConsistencyLevel) throws RaftConnectionException, IOException { // Init all series with data group of select series,if filter series has the same data group, init them together. - for (Entry<String, QueryPlan> entry : selectPathPlans.entrySet()) { + for (Entry<String, SelectSeriesGroupEntity> entry : selectSeriesGroupEntityMap.entrySet()) { String groupId = entry.getKey(); - QueryPlan queryPlan = entry.getValue(); + SelectSeriesGroupEntity selectEntity = entry.getValue(); + QueryPlan queryPlan = selectEntity.getQueryPlan(); if (!QPExecutorUtils.canHandleQueryByGroupId(groupId)) { Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class); allQueryPlan.put(PathType.SELECT_PATH, queryPlan); List<Filter> filterList = new ArrayList<>(); - if (filterGroupEntityMap.containsKey(groupId)) { - FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId); - allQueryPlan.put(PathType.FILTER_PATH, filterGroupEntity.getQueryPlan()); - filterList = filterGroupEntity.getFilters(); + if (filterSeriesGroupEntityMap.containsKey(groupId)) { + FilterSeriesGroupEntity filterSeriesGroupEntity = filterSeriesGroupEntityMap.get(groupId); + allQueryPlan.put(PathType.FILTER_PATH, filterSeriesGroupEntity.getQueryPlan()); + filterList = filterSeriesGroupEntity.getFilters(); } /** create request **/ BasicRequest request = InitSeriesReaderRequest @@ -161,27 +150,29 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag handleInitReaderResponse(groupId, allQueryPlan, response); } else { dataGroupUsage.add(groupId); - selectSeriesByGroupId.remove(groupId); - if (filterGroupEntityMap.containsKey(groupId)) { - filterGroupEntityMap.remove(groupId); - } + selectSeriesGroupEntityMap.remove(groupId); + filterSeriesGroupEntityMap.remove(groupId); } } //Init series reader with data groups of filter series, which don't exist in data groups list of select series. - for (Entry<String, FilterGroupEntity> entry : filterGroupEntityMap.entrySet()) { + for (Entry<String, FilterSeriesGroupEntity> entry : filterSeriesGroupEntityMap.entrySet()) { String groupId = entry.getKey(); - if (!selectPathPlans.containsKey(groupId)) { + if (!selectSeriesGroupEntityMap.containsKey(groupId) && !QPExecutorUtils + .canHandleQueryByGroupId(groupId)) { Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class); - FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId); - allQueryPlan.put(PathType.FILTER_PATH, filterGroupEntity.getQueryPlan()); - List<Filter> filterList = filterGroupEntity.getFilters(); + FilterSeriesGroupEntity filterSeriesGroupEntity = filterSeriesGroupEntityMap.get(groupId); + allQueryPlan.put(PathType.FILTER_PATH, filterSeriesGroupEntity.getQueryPlan()); + List<Filter> filterList = filterSeriesGroupEntity.getFilters(); BasicRequest request = InitSeriesReaderRequest .createInitialQueryRequest(groupId, taskId, readDataConsistencyLevel, allQueryPlan, filterList); InitSeriesReaderResponse response = (InitSeriesReaderResponse) ClusterRpcReaderUtils .createClusterSeriesReader(groupId, request, this); handleInitReaderResponse(groupId, allQueryPlan, response); + } else if (!selectSeriesGroupEntityMap.containsKey(groupId)) { + dataGroupUsage.add(groupId); + filterSeriesGroupEntityMap.remove(groupId); } } } @@ -201,7 +192,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag TSDataType dataType = seriesType.get(i); ClusterSelectSeriesReader seriesReader = new ClusterSelectSeriesReader(groupId, seriesPath, dataType, this); - selectSeriesReaders.put(seriesPath, seriesReader); + selectSeriesGroupEntityMap.get(groupId).addSelectSeriesReader(seriesReader); } } if (allQueryPlan.containsKey(PathType.FILTER_PATH)) { @@ -213,10 +204,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag TSDataType dataType = seriesType.get(i); ClusterFilterSeriesReader seriesReader = new ClusterFilterSeriesReader(groupId, seriesPath, dataType, this); - if (!filterGroupEntityMap.containsKey(groupId)) { - filterGroupEntityMap.put(groupId, new FilterGroupEntity(groupId)); - } - filterGroupEntityMap.get(groupId).addFilterSeriesReader(seriesReader); + filterSeriesGroupEntityMap.get(groupId).addFilterSeriesReader(seriesReader); } } } @@ -224,16 +212,15 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag @Override public void fetchBatchDataForSelectPaths(String groupId) throws RaftConnectionException { List<String> fetchDataSeries = new ArrayList<>(); - Map<String, List<Path>> seriesByGroupId; - Map<Path, ClusterSelectSeriesReader> seriesReaders; - seriesByGroupId = selectSeriesByGroupId; - seriesReaders = selectSeriesReaders; - if (seriesByGroupId.containsKey(groupId)) { - List<Path> allFilterSeries = seriesByGroupId.get(groupId); - for (Path series : allFilterSeries) { - if (seriesReaders.get(series).enableFetchData()) { - fetchDataSeries.add(series.getFullPath()); - } + List<Integer> selectSeriesIndexs = new ArrayList<>(); + List<Path> selectSeries = selectSeriesGroupEntityMap.get(groupId).getSelectPaths(); + List<ClusterSelectSeriesReader> seriesReaders = selectSeriesGroupEntityMap.get(groupId) + .getSelectSeriesReaders(); + for (int i = 0; i < selectSeries.size(); i++) { + Path series = selectSeries.get(i); + if (seriesReaders.get(i).enableFetchData()) { + fetchDataSeries.add(series.getFullPath()); + selectSeriesIndexs.add(i); } } BasicRequest request = QuerySeriesDataRequest @@ -241,7 +228,8 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag queryRounds++); QuerySeriesDataResponse response = (QuerySeriesDataResponse) ClusterRpcReaderUtils .handleQueryRequest(request, queryNodes.get(groupId), 0); - handleFetchDataResponseForSelectPaths(fetchDataSeries, response); + + handleFetchDataResponseForSelectPaths(groupId, selectSeriesIndexs, response); } @Override @@ -258,42 +246,45 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag @Override public void fetchBatchDataByTimestampForAllSelectPaths(List<Long> batchTimestamp) throws RaftConnectionException { - for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) { + for (Entry<String, SelectSeriesGroupEntity> entry : selectSeriesGroupEntityMap.entrySet()) { String groupId = entry.getKey(); List<String> fetchDataFilterSeries = new ArrayList<>(); - entry.getValue().forEach(path -> fetchDataFilterSeries.add(path.getFullPath())); + entry.getValue().getSelectPaths() + .forEach(path -> fetchDataFilterSeries.add(path.getFullPath())); BasicRequest request = QuerySeriesDataByTimestampRequest .createRequest(groupId, queryRounds++, taskId, batchTimestamp, fetchDataFilterSeries); QuerySeriesDataByTimestampResponse response = (QuerySeriesDataByTimestampResponse) ClusterRpcReaderUtils .handleQueryRequest(request, queryNodes.get(groupId), 0); - handleFetchDataByTimestampResponseForSelectPaths(fetchDataFilterSeries, response); + handleFetchDataByTimestampResponseForSelectPaths(groupId, fetchDataFilterSeries, response); } } /** * Handle response of fetching data, and add batch data to corresponding reader. */ - private void handleFetchDataByTimestampResponseForSelectPaths(List<String> fetchDataSeries, + private void handleFetchDataByTimestampResponseForSelectPaths(String groupId, + List<String> fetchDataSeries, BasicQueryDataResponse response) { List<BatchData> batchDataList = response.getSeriesBatchData(); + List<ClusterSelectSeriesReader> selectSeriesReaders = selectSeriesGroupEntityMap.get(groupId) + .getSelectSeriesReaders(); for (int i = 0; i < fetchDataSeries.size(); i++) { - String series = fetchDataSeries.get(i); BatchData batchData = batchDataList.get(i); - selectSeriesReaders.get(new Path(series)) - .addBatchData(batchData, true); + selectSeriesReaders.get(i).addBatchData(batchData, true); } } /** * Handle response of fetching data, and add batch data to corresponding reader. */ - private void handleFetchDataResponseForSelectPaths(List<String> fetchDataSeries, - BasicQueryDataResponse response) { + private void handleFetchDataResponseForSelectPaths(String groupId, + List<Integer> selectSeriesIndexs, BasicQueryDataResponse response) { List<BatchData> batchDataList = response.getSeriesBatchData(); - for (int i = 0; i < fetchDataSeries.size(); i++) { - String series = fetchDataSeries.get(i); + List<ClusterSelectSeriesReader> selectSeriesReaders = selectSeriesGroupEntityMap.get(groupId) + .getSelectSeriesReaders(); + for (int i = 0; i < selectSeriesIndexs.size(); i++) { BatchData batchData = batchDataList.get(i); - selectSeriesReaders.get(new Path(series)) + selectSeriesReaders.get(selectSeriesIndexs.get(i)) .addBatchData(batchData, batchData.length() < CLUSTER_CONF.getBatchReadSize()); } } @@ -303,10 +294,11 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag */ private void handleFetchDataResponseForFilterPaths(String groupId, QuerySeriesDataResponse response) { - FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId); - List<Path> fetchDataSeries = filterGroupEntity.getFilterPaths(); + FilterSeriesGroupEntity filterSeriesGroupEntity = filterSeriesGroupEntityMap.get(groupId); + List<Path> fetchDataSeries = filterSeriesGroupEntity.getFilterPaths(); List<BatchData> batchDataList = response.getSeriesBatchData(); - List<ClusterFilterSeriesReader> filterReaders = filterGroupEntity.getFilterSeriesReaders(); + List<ClusterFilterSeriesReader> filterReaders = filterSeriesGroupEntity + .getFilterSeriesReaders(); boolean remoteDataFinish = true; for (int i = 0; i < batchDataList.size(); i++) { if (batchDataList.get(i).length() != 0) { @@ -323,11 +315,6 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag } @Override - public QueryPlan getSelectPathQueryPlan(String fullPath) { - return selectPathPlans.get(fullPath); - } - - @Override public void setDataGroupReaderNode(String groupId, PeerId readerNode) { queryNodes.put(groupId, readerNode); } @@ -375,19 +362,11 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag this.queryNodes.put(groupID, peerId); } - public Map<String, QueryPlan> getSelectPathPlans() { - return selectPathPlans; - } - - public Map<String, List<Path>> getSelectSeriesByGroupId() { - return selectSeriesByGroupId; - } - - public Map<Path, ClusterSelectSeriesReader> getSelectSeriesReaders() { - return selectSeriesReaders; + public Map<String, SelectSeriesGroupEntity> getSelectSeriesGroupEntityMap() { + return selectSeriesGroupEntityMap; } - public Map<String, FilterGroupEntity> getFilterGroupEntityMap() { - return filterGroupEntityMap; + public Map<String, FilterSeriesGroupEntity> getFilterSeriesGroupEntityMap() { + return filterSeriesGroupEntityMap; } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterSeriesGroupEntity.java similarity index 97% copy from cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java copy to cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterSeriesGroupEntity.java index 326af11..19407a0 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterSeriesGroupEntity.java @@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; /** * Filter entities of a data group, concluding QueryPlan, filters, all filter paths and filter readers */ -public class FilterGroupEntity { +public class FilterSeriesGroupEntity { /** * Group id @@ -62,7 +62,7 @@ public class FilterGroupEntity { */ private List<ClusterFilterSeriesReader> filterSeriesReaders; - public FilterGroupEntity(String groupId) { + public FilterSeriesGroupEntity(String groupId) { this.groupId = groupId; this.filterPaths = new ArrayList<>(); this.filters = new ArrayList<>(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java index c4aec9c..d6ca0d7 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java @@ -69,13 +69,6 @@ public interface IClusterRpcSingleQueryManager { throws RaftConnectionException; /** - * Get query plan of select path - * - * @param fullPath Timeseries full path in select paths - */ - QueryPlan getSelectPathQueryPlan(String fullPath); - - /** * Set reader node of a data group * * @param groupId data group id diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java similarity index 55% rename from cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java rename to cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java index 326af11..9f35117 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java @@ -21,15 +21,14 @@ package org.apache.iotdb.cluster.query.manager.coordinatornode; import java.util.ArrayList; import java.util.List; import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterFilterSeriesReader; +import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.read.filter.basic.Filter; /** - * Filter entities of a data group, concluding QueryPlan, filters, all filter paths and filter readers + * Select series entity entities of a data group, concluding QueryPlan, all select paths and series readers */ -public class FilterGroupEntity { - +public class SelectSeriesGroupEntity { /** * Group id */ @@ -41,32 +40,24 @@ public class FilterGroupEntity { private QueryPlan queryPlan; /** - * Filters of filter path. - */ - private List<Filter> filters; - - /** * - * all filter series + * all select series * <p> - * Note: It may contain multiple series in a complicated tree - * for example: select * from root.vehicle where d0.s0 > 10 and d0.s0 < 101 or time = 12, - * filter tree: <code>[[[[root.vehicle.d0.s0:time == 12] || [root.vehicle.d0.s1:time == 12]] || [root.vehicle.d1.s2:time == 12]] || [root.vehicle.d1.s3:time == 12]]</code> + * Note: It may contain multiple series in a query + * for example: select sum(s0), max(s0) from root.vehicle.d0 where s0 > 10 * </p> */ - private List<Path> filterPaths; - + private List<Path> selectPaths; /** * Series reader of filter paths (only contains remote series) */ - private List<ClusterFilterSeriesReader> filterSeriesReaders; + private List<ClusterSelectSeriesReader> selectSeriesReaders; - public FilterGroupEntity(String groupId) { + public SelectSeriesGroupEntity(String groupId) { this.groupId = groupId; - this.filterPaths = new ArrayList<>(); - this.filters = new ArrayList<>(); - this.filterSeriesReaders = new ArrayList<>(); + this.selectPaths = new ArrayList<>(); + this.selectSeriesReaders = new ArrayList<>(); } public String getGroupId() { @@ -85,27 +76,19 @@ public class FilterGroupEntity { this.queryPlan = queryPlan; } - public List<Filter> getFilters() { - return filters; - } - - public void addFilter(Filter filter) { - this.filters.add(filter); - } - - public List<Path> getFilterPaths() { - return filterPaths; + public List<Path> getSelectPaths() { + return selectPaths; } - public void addFilterPaths(Path filterPath) { - this.filterPaths.add(filterPath); + public void addSelectPaths(Path selectPath) { + this.selectPaths.add(selectPath); } - public List<ClusterFilterSeriesReader> getFilterSeriesReaders() { - return filterSeriesReaders; + public List<ClusterSelectSeriesReader> getSelectSeriesReaders() { + return selectSeriesReaders; } - public void addFilterSeriesReader(ClusterFilterSeriesReader filterSeriesReader) { - this.filterSeriesReaders.add(filterSeriesReader); + public void addSelectSeriesReader(ClusterSelectSeriesReader selectSeriesReader) { + this.selectSeriesReaders.add(selectSeriesReader); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java index 3f73160..c0012a1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java @@ -71,4 +71,6 @@ public abstract class AbstractClusterPointReader implements IPointReader { } return null; } + + public abstract void addBatchData(BatchData batchData, boolean remoteDataFinish); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java index 805d3af..0c0287e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java @@ -95,14 +95,6 @@ public class ClusterFilterSeriesReader extends AbstractClusterPointReader { //Do nothing } - public Path getSeriesPath() { - return seriesPath; - } - - public void setSeriesPath(Path seriesPath) { - this.seriesPath = seriesPath; - } - public TSDataType getDataType() { return dataType; } @@ -111,14 +103,7 @@ public class ClusterFilterSeriesReader extends AbstractClusterPointReader { this.dataType = dataType; } - public BatchData getCurrentBatchData() { - return currentBatchData; - } - - public void setCurrentBatchData(BatchData currentBatchData) { - this.currentBatchData = currentBatchData; - } - + @Override public void addBatchData(BatchData batchData, boolean remoteDataFinish) { batchDataList.addLast(batchData); this.remoteDataFinish = remoteDataFinish; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java index 0a507d5..c640b53 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java @@ -119,14 +119,6 @@ public class ClusterSelectSeriesReader extends AbstractClusterPointReader implem batchDataList = null; } - public Path getSeriesPath() { - return seriesPath; - } - - public void setSeriesPath(Path seriesPath) { - this.seriesPath = seriesPath; - } - public TSDataType getDataType() { return dataType; } @@ -135,27 +127,12 @@ public class ClusterSelectSeriesReader extends AbstractClusterPointReader implem this.dataType = dataType; } - public BatchData getCurrentBatchData() { - return currentBatchData; - } - - public void setCurrentBatchData(BatchData currentBatchData) { - this.currentBatchData = currentBatchData; - } - + @Override public void addBatchData(BatchData batchData, boolean remoteDataFinish) { batchDataList.addLast(batchData); this.remoteDataFinish = remoteDataFinish; } - public boolean isRemoteDataFinish() { - return remoteDataFinish; - } - - public void setRemoteDataFinish(boolean remoteDataFinish) { - this.remoteDataFinish = remoteDataFinish; - } - /** * Check if this series need to fetch data from remote query node */ diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/timegenerator/ClusterNodeConstructor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/timegenerator/ClusterNodeConstructor.java index 639dce8..2b3ab18 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/timegenerator/ClusterNodeConstructor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/timegenerator/ClusterNodeConstructor.java @@ -25,7 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; -import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity; +import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity; import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterFilterSeriesReader; import org.apache.iotdb.cluster.utils.QPExecutorUtils; import org.apache.iotdb.db.exception.FileNodeManagerException; @@ -65,7 +65,7 @@ public class ClusterNodeConstructor extends AbstractNodeConstructor { * Init filter series reader */ private void init(ClusterRpcSingleQueryManager queryManager) { - Map<String, FilterGroupEntity> filterGroupEntityMap = queryManager.getFilterGroupEntityMap(); + Map<String, FilterSeriesGroupEntity> filterGroupEntityMap = queryManager.getFilterSeriesGroupEntityMap(); filterGroupEntityMap.forEach( (key, value) -> filterSeriesReadersByGroupId.put(key, value.getFilterSeriesReaders())); filterSeriesReadersByGroupId.forEach((key, value) -> filterSeriesReaderIndex.put(key, 0)); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java index a0ee256..0f05cf2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.iotdb.cluster.query.utils; import org.apache.iotdb.cluster.query.manager.common.FillBatchData; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ExpressionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ExpressionUtils.java index 0024138..4089e9b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ExpressionUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ExpressionUtils.java @@ -26,7 +26,7 @@ import static org.apache.iotdb.tsfile.read.expression.ExpressionType.TRUE; import java.util.List; import java.util.Map; import org.apache.iotdb.cluster.query.expression.TrueExpression; -import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity; +import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity; import org.apache.iotdb.cluster.utils.QPExecutorUtils; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; @@ -46,17 +46,17 @@ public class ExpressionUtils { * Get all series path of expression group by group id */ public static void getAllExpressionSeries(IExpression expression, - Map<String, FilterGroupEntity> filterGroupEntityMap) + Map<String, FilterSeriesGroupEntity> filterGroupEntityMap) throws PathErrorException { if (expression.getType() == ExpressionType.SERIES) { Path path = ((SingleSeriesExpression) expression).getSeriesPath(); String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice()); if (!filterGroupEntityMap.containsKey(groupId)) { - filterGroupEntityMap.put(groupId, new FilterGroupEntity(groupId)); + filterGroupEntityMap.put(groupId, new FilterSeriesGroupEntity(groupId)); } - FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId); - filterGroupEntity.addFilterPaths(path); - filterGroupEntity.addFilter(((SingleSeriesExpression) expression).getFilter()); + FilterSeriesGroupEntity filterSeriesGroupEntity = filterGroupEntityMap.get(groupId); + filterSeriesGroupEntity.addFilterPaths(path); + filterSeriesGroupEntity.addFilter(((SingleSeriesExpression) expression).getFilter()); } else if (expression.getType() == OR || expression.getType() == AND) { getAllExpressionSeries(((IBinaryExpression) expression).getLeft(), filterGroupEntityMap); getAllExpressionSeries(((IBinaryExpression) expression).getRight(), filterGroupEntityMap); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java index 5fbd30c..3a2746f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java @@ -25,7 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; -import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity; +import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity; +import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity; import org.apache.iotdb.cluster.utils.QPExecutorUtils; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; @@ -84,24 +85,24 @@ public class QueryPlanPartitionUtils { private static void splitQueryPlanBySelectPath(ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException { QueryPlan queryPlan = singleQueryManager.getOriginQueryPlan(); - Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId(); - Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans(); + // split query plan by select path + Map<String, SelectSeriesGroupEntity> selectGroupEntityMap = singleQueryManager + .getSelectSeriesGroupEntityMap(); List<Path> selectPaths = queryPlan.getPaths(); for (Path path : selectPaths) { String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice()); - if (!selectSeriesByGroupId.containsKey(groupId)) { - selectSeriesByGroupId.put(groupId, new ArrayList<>()); + if (!selectGroupEntityMap.containsKey(groupId)) { + selectGroupEntityMap.put(groupId, new SelectSeriesGroupEntity(groupId)); } - selectSeriesByGroupId.get(groupId).add(path); + selectGroupEntityMap.get(groupId).addSelectPaths(path); } - for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) { - String groupId = entry.getKey(); - List<Path> paths = entry.getValue(); + for (SelectSeriesGroupEntity entity : selectGroupEntityMap.values()) { + List<Path> paths = entity.getSelectPaths(); QueryPlan subQueryPlan = new QueryPlan(); subQueryPlan.setProposer(queryPlan.getProposer()); subQueryPlan.setPaths(paths); subQueryPlan.setExpression(queryPlan.getExpression()); - selectPathPlans.put(groupId, subQueryPlan); + entity.setQueryPlan(subQueryPlan); } } @@ -113,12 +114,12 @@ public class QueryPlanPartitionUtils { throws PathErrorException { QueryPlan queryPlan = singleQueryManager.getOriginQueryPlan(); // split query plan by filter path - Map<String, FilterGroupEntity> filterGroupEntityMap = singleQueryManager - .getFilterGroupEntityMap(); + Map<String, FilterSeriesGroupEntity> filterGroupEntityMap = singleQueryManager + .getFilterSeriesGroupEntityMap(); IExpression expression = queryPlan.getExpression(); ExpressionUtils.getAllExpressionSeries(expression, filterGroupEntityMap); - for (FilterGroupEntity filterGroupEntity : filterGroupEntityMap.values()) { - List<Path> filterSeriesList = filterGroupEntity.getFilterPaths(); + for (FilterSeriesGroupEntity filterSeriesGroupEntity : filterGroupEntityMap.values()) { + List<Path> filterSeriesList = filterSeriesGroupEntity.getFilterPaths(); // create filter sub query plan QueryPlan subQueryPlan = new QueryPlan(); subQueryPlan.setPaths(filterSeriesList); @@ -127,7 +128,7 @@ public class QueryPlanPartitionUtils { if (subExpression.getType() != ExpressionType.TRUE) { subQueryPlan.setExpression(subExpression); } - filterGroupEntity.setQueryPlan(subQueryPlan); + filterSeriesGroupEntity.setQueryPlan(subQueryPlan); } } @@ -157,29 +158,30 @@ public class QueryPlanPartitionUtils { AggregationPlan queryPlan = (AggregationPlan) singleQueryManager.getOriginQueryPlan(); List<Path> selectPaths = queryPlan.getPaths(); List<String> aggregations = queryPlan.getAggregations(); - Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId(); Map<String, List<String>> selectAggregationByGroupId = new HashMap<>(); - Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans(); + Map<String, SelectSeriesGroupEntity> selectGroupEntityMap = singleQueryManager + .getSelectSeriesGroupEntityMap(); for (int i = 0; i < selectPaths.size(); i++) { Path path = selectPaths.get(i); String aggregation = aggregations.get(i); String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice()); - if (!selectSeriesByGroupId.containsKey(groupId)) { - selectSeriesByGroupId.put(groupId, new ArrayList<>()); + if (!selectGroupEntityMap.containsKey(groupId)) { + selectGroupEntityMap.put(groupId, new SelectSeriesGroupEntity(groupId)); selectAggregationByGroupId.put(groupId, new ArrayList<>()); } selectAggregationByGroupId.get(groupId).add(aggregation); - selectSeriesByGroupId.get(groupId).add(path); + selectGroupEntityMap.get(groupId).addSelectPaths(path); } - for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) { + for (Entry<String, SelectSeriesGroupEntity> entry : selectGroupEntityMap.entrySet()) { String groupId = entry.getKey(); - List<Path> paths = entry.getValue(); + SelectSeriesGroupEntity entity = entry.getValue(); + List<Path> paths = entity.getSelectPaths(); AggregationPlan subQueryPlan = new AggregationPlan(); subQueryPlan.setProposer(queryPlan.getProposer()); subQueryPlan.setPaths(paths); subQueryPlan.setExpression(queryPlan.getExpression()); subQueryPlan.setAggregations(selectAggregationByGroupId.get(groupId)); - selectPathPlans.put(groupId, subQueryPlan); + entity.setQueryPlan(subQueryPlan); } } @@ -200,25 +202,24 @@ public class QueryPlanPartitionUtils { throws PathErrorException { FillQueryPlan fillQueryPlan = (FillQueryPlan) singleQueryManager.getOriginQueryPlan(); List<Path> selectPaths = fillQueryPlan.getPaths(); - Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId(); - Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans(); + Map<String, SelectSeriesGroupEntity> selectGroupEntityMap = singleQueryManager + .getSelectSeriesGroupEntityMap(); for (Path path : selectPaths) { String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice()); - if (!selectSeriesByGroupId.containsKey(groupId)) { - selectSeriesByGroupId.put(groupId, new ArrayList<>()); + if (!selectGroupEntityMap.containsKey(groupId)) { + selectGroupEntityMap.put(groupId, new SelectSeriesGroupEntity(groupId)); } - selectSeriesByGroupId.get(groupId).add(path); + selectGroupEntityMap.get(groupId).addSelectPaths(path); } - for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) { - String groupId = entry.getKey(); - List<Path> paths = entry.getValue(); + for (SelectSeriesGroupEntity entity : selectGroupEntityMap.values()) { + List<Path> paths = entity.getSelectPaths(); FillQueryPlan subQueryPlan = new FillQueryPlan(); subQueryPlan.setProposer(fillQueryPlan.getProposer()); subQueryPlan.setPaths(paths); subQueryPlan.setExpression(fillQueryPlan.getExpression()); subQueryPlan.setQueryTime(fillQueryPlan.getQueryTime()); subQueryPlan.setFillType(new EnumMap<>(fillQueryPlan.getFillType())); - selectPathPlans.put(groupId, subQueryPlan); + entity.setQueryPlan(subQueryPlan); } } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterRpcManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterRpcManagerTest.java index b800fbf..4745553 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterRpcManagerTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterRpcManagerTest.java @@ -39,7 +39,8 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.entity.Server; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; -import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity; +import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity; +import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity; import org.apache.iotdb.cluster.utils.EnvironmentUtils; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.jdbc.Config; @@ -261,25 +262,8 @@ public class ClusterRpcManagerTest { assertEquals(taskId, singleManager.getTaskId()); // select path plans - Map<String, QueryPlan> selectPathPlans = singleManager.getSelectPathPlans(); - assertEquals(1, selectPathPlans.size()); - for (QueryPlan queryPlan : selectPathPlans.values()) { - List<Path> paths = queryPlan.getPaths(); - List<Path> correctPaths = new ArrayList<>(); - correctPaths.add(new Path("root.vehicle.d0.s0")); - correctPaths.add(new Path("root.vehicle.d0.s1")); - correctPaths.add(new Path("root.vehicle.d0.s3")); - assertEquals(correctPaths, paths); - assertNull(queryPlan.getExpression()); - } - - // select series by group id - assertEquals(0, singleManager.getSelectSeriesByGroupId().size()); - - // select series reader - assertTrue(singleManager - .getSelectSeriesReaders().isEmpty()); - + Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = singleManager.getSelectSeriesGroupEntityMap(); + assertTrue(selectSeriesGroupEntityMap.isEmpty()); } statement.close(); } @@ -304,27 +288,11 @@ public class ClusterRpcManagerTest { assertEquals(taskId, singleManager.getTaskId()); // select path plans - Map<String, QueryPlan> selectPathPlans = singleManager.getSelectPathPlans(); - assertEquals(1, selectPathPlans.size()); - for (QueryPlan queryPlan : selectPathPlans.values()) { - List<Path> paths = queryPlan.getPaths(); - List<Path> correctPaths = new ArrayList<>(); - correctPaths.add(new Path("root.vehicle.d0.s0")); - correctPaths.add(new Path("root.vehicle.d0.s1")); - correctPaths.add(new Path("root.vehicle.d0.s3")); - assertEquals(correctPaths, paths); - assertNotNull(queryPlan.getExpression()); - } - - // select series by group id - assertTrue(singleManager.getSelectSeriesByGroupId().isEmpty()); - - // select series reader - assertTrue(singleManager - .getSelectSeriesReaders().isEmpty()); + Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = singleManager.getSelectSeriesGroupEntityMap(); + assertTrue(selectSeriesGroupEntityMap.isEmpty()); // filter path plans - Map<String, FilterGroupEntity> filterGroupEntityMap = singleManager.getFilterGroupEntityMap(); + Map<String, FilterSeriesGroupEntity> filterGroupEntityMap = singleManager.getFilterSeriesGroupEntityMap(); assertTrue(filterGroupEntityMap.isEmpty()); } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java index a0d409b..363ef98 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java @@ -39,8 +39,11 @@ import org.apache.iotdb.cluster.entity.Server; import org.apache.iotdb.cluster.qp.executor.ClusterQueryProcessExecutor; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; -import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity; +import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity; +import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity; import org.apache.iotdb.cluster.utils.EnvironmentUtils; +import org.apache.iotdb.cluster.utils.QPExecutorUtils; +import org.apache.iotdb.cluster.utils.hash.PhysicalNode; import org.apache.iotdb.db.qp.QueryProcessor; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.jdbc.Config; @@ -60,6 +63,9 @@ public class QueryPlanPartitionUtilsTest { private static ClusterRpcQueryManager manager = ClusterRpcQueryManager.getInstance(); private ClusterQueryProcessExecutor queryDataExecutor = new ClusterQueryProcessExecutor(); private QueryProcessor queryProcessor = new QueryProcessor(queryDataExecutor); + private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(), + CLUSTER_CONFIG.getPort()); + private static final String URL = "127.0.0.1:6667/"; @@ -105,6 +111,7 @@ public class QueryPlanPartitionUtilsTest { EnvironmentUtils.cleanEnv(); EnvironmentUtils.closeStatMonitor(); EnvironmentUtils.closeMemControl(); + QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); CLUSTER_CONFIG.createAllPath(); server = Server.getInstance(); server.start(); @@ -115,6 +122,7 @@ public class QueryPlanPartitionUtilsTest { @After public void tearDown() throws Exception { server.stop(); + QPExecutorUtils.setLocalNodeAddr(localNode.getIp(), localNode.getPort()); EnvironmentUtils.cleanEnv(); } @@ -230,14 +238,14 @@ public class QueryPlanPartitionUtilsTest { } @Test - public void splitQueryPlanWithoutValueFilter() throws Exception{ + public void splitQueryPlanWithoutValueFilter() throws Exception { try (Connection connection = DriverManager .getConnection(Config.IOTDB_URL_PREFIX + URL, "root", "root")) { insertData(connection, createSQLs, insertSQLs); initCorrectResult(); - for(int i = 0 ; i < queryStatementsWithoutFilters.length; i++) { + for (int i = 0; i < queryStatementsWithoutFilters.length; i++) { String queryStatementsWithoutFilter = queryStatementsWithoutFilters[i]; - try(Statement statement = connection.createStatement()) { + try (Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute(queryStatementsWithoutFilter); assertTrue(hasResultSet); ResultSet resultSet = statement.getResultSet(); @@ -256,14 +264,15 @@ public class QueryPlanPartitionUtilsTest { assertEquals(taskId, String.format("%s:%d", LOCAL_ADDR, jobId)); ClusterRpcSingleQueryManager singleQueryManager = ClusterRpcQueryManager.getInstance() .getSingleQuery(jobId); - assertTrue(singleQueryManager.getFilterGroupEntityMap().isEmpty()); - Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans(); - assertFalse(selectPathPlans.isEmpty()); - for(Entry<String, QueryPlan> entry1: selectPathPlans.entrySet()){ - QueryPlan queryPlan = entry1.getValue(); + assertTrue(singleQueryManager.getFilterSeriesGroupEntityMap().isEmpty()); + Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = singleQueryManager + .getSelectSeriesGroupEntityMap(); + assertFalse(selectSeriesGroupEntityMap.isEmpty()); + for (SelectSeriesGroupEntity entity : selectSeriesGroupEntityMap.values()) { + QueryPlan queryPlan = entity.getQueryPlan(); QueryPlan correctQueryPlan = withoutFilterResults.get(i + 1); assertTrue(correctQueryPlan.getPaths().containsAll(queryPlan.getPaths())); - assertEquals(correctQueryPlan.getExpression(),queryPlan.getExpression()); + assertEquals(correctQueryPlan.getExpression(), queryPlan.getExpression()); assertEquals(correctQueryPlan.isQuery(), queryPlan.isQuery()); assertEquals(correctQueryPlan.getOperatorType(), queryPlan.getOperatorType()); assertEquals(correctQueryPlan.getAggregations(), queryPlan.getAggregations()); @@ -275,14 +284,14 @@ public class QueryPlanPartitionUtilsTest { } @Test - public void splitQueryPlanWithValueFilter() throws Exception{ + public void splitQueryPlanWithValueFilter() throws Exception { try (Connection connection = DriverManager .getConnection(Config.IOTDB_URL_PREFIX + URL, "root", "root")) { insertData(connection, createSQLs, insertSQLs); initCorrectResult(); - for(int i = 0 ; i < queryStatementsWithFilters.length; i++) { + for (int i = 0; i < queryStatementsWithFilters.length; i++) { String queryStatementsWithoutFilter = queryStatementsWithFilters[i]; - try(Statement statement = connection.createStatement()) { + try (Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute(queryStatementsWithoutFilter); assertTrue(hasResultSet); ResultSet resultSet = statement.getResultSet(); @@ -301,21 +310,24 @@ public class QueryPlanPartitionUtilsTest { assertEquals(taskId, String.format("%s:%d", LOCAL_ADDR, jobId)); ClusterRpcSingleQueryManager singleQueryManager = ClusterRpcQueryManager.getInstance() .getSingleQuery(jobId); - assertTrue(singleQueryManager.getFilterGroupEntityMap().isEmpty()); - Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans(); - assertFalse(selectPathPlans.isEmpty()); - for(Entry<String, QueryPlan> entry1 : selectPathPlans.entrySet()) { - QueryPlan queryPlan = entry1.getValue(); + assertFalse(singleQueryManager.getFilterSeriesGroupEntityMap().isEmpty()); + Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = singleQueryManager + .getSelectSeriesGroupEntityMap(); + assertFalse(selectSeriesGroupEntityMap.isEmpty()); + for (SelectSeriesGroupEntity entity : selectSeriesGroupEntityMap.values()) { + QueryPlan queryPlan = entity.getQueryPlan(); QueryPlan correctQueryPlan = withFilterSelectResults.get(i + 1); assertTrue(correctQueryPlan.getPaths().containsAll(queryPlan.getPaths())); - assertEquals(correctQueryPlan.getExpression().getType(), queryPlan.getExpression().getType()); + assertEquals(correctQueryPlan.getExpression().getType(), + queryPlan.getExpression().getType()); assertEquals(correctQueryPlan.isQuery(), queryPlan.isQuery()); assertEquals(correctQueryPlan.getOperatorType(), queryPlan.getOperatorType()); assertEquals(correctQueryPlan.getAggregations(), queryPlan.getAggregations()); } - Map<String, FilterGroupEntity> filterGroupEntityMap = singleQueryManager.getFilterGroupEntityMap(); - for (FilterGroupEntity filterGroupEntity:filterGroupEntityMap.values()) { - QueryPlan queryPlan = filterGroupEntity.getQueryPlan(); + Map<String, FilterSeriesGroupEntity> filterGroupEntityMap = singleQueryManager + .getFilterSeriesGroupEntityMap(); + for (FilterSeriesGroupEntity filterSeriesGroupEntity : filterGroupEntityMap.values()) { + QueryPlan queryPlan = filterSeriesGroupEntity.getQueryPlan(); QueryPlan correctQueryPlan = withFilterFilterResults.get(i + 1); assertTrue(correctQueryPlan.getPaths().containsAll(queryPlan.getPaths())); assertEquals(correctQueryPlan.getExpression().getType(),
