This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 9487dbfc0a68fb62efa5270cc4fb080ea4e2a3a0 Author: lta <[email protected]> AuthorDate: Mon May 20 17:27:57 2019 +0800 add it test of aggregation function --- .../query/{manager => }/common/FillBatchData.java | 2 +- .../executor/ClusterAggregateEngineExecutor.java | 1 - .../ClusterRpcSingleQueryManager.java | 26 ++--- .../IClusterRpcSingleQueryManager.java | 4 +- .../coordinatornode/SelectSeriesGroupEntity.java | 1 - .../querynode/ClusterLocalSingleQueryManager.java | 76 +++++++++------ .../coordinatornode/ClusterFilterSeriesReader.java | 2 +- .../ClusterFillSelectSeriesBatchReader.java | 2 +- ...a => ClusterFilterSeriesBatchReaderEntity.java} | 6 +- ...a => ClusterSelectSeriesBatchReaderEntity.java} | 42 +++++++-- ... => IClusterFilterSeriesBatchReaderEntity.java} | 2 +- .../query/utils/ClusterTimeValuePairUtils.java | 2 +- .../QuerySeriesDataByTimestampRequest.java | 17 +--- .../request/querydata/QuerySeriesDataRequest.java | 16 ++-- .../integration/IoTDBAggregationLargeDataIT.java | 3 +- .../integration/IoTDBAggregationSmallDataIT.java | 94 ------------------ .../query/manager/ClusterLocalManagerTest.java | 105 +++++++++++---------- 17 files changed, 160 insertions(+), 241 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java similarity index 97% rename from cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java rename to cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java index 3e128e3..2d17d0c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.cluster.query.manager.common; +package org.apache.iotdb.cluster.query.common; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.tsfile.read.common.BatchData; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java index b34afa1..2cf4e87 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java @@ -69,7 +69,6 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor { private ClusterRpcSingleQueryManager queryManager; - private static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig(); public ClusterAggregateEngineExecutor(List<Path> selectedSeries, List<String> aggres, diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java index 905ce1b..05bf9df 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java @@ -211,29 +211,26 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag @Override public void fetchBatchDataForSelectPaths(String groupId) throws RaftConnectionException { - List<String> fetchDataSeries = new ArrayList<>(); - List<Integer> selectSeriesIndexs = new ArrayList<>(); + List<Integer> fetchDataSeriesIndexs = new ArrayList<>(); List<Path> selectSeries = selectSeriesGroupEntityMap.get(groupId).getSelectPaths(); List<ClusterSelectSeriesReader> seriesReaders = selectSeriesGroupEntityMap.get(groupId) .getSelectSeriesReaders(); for (int i = 0; i < selectSeries.size(); i++) { - Path series = selectSeries.get(i); if (seriesReaders.get(i).enableFetchData()) { - fetchDataSeries.add(series.getFullPath()); - selectSeriesIndexs.add(i); + fetchDataSeriesIndexs.add(i); } } BasicRequest request = QuerySeriesDataRequest - .createFetchDataRequest(groupId, taskId, PathType.SELECT_PATH, fetchDataSeries, + .createFetchDataRequest(groupId, taskId, PathType.SELECT_PATH, fetchDataSeriesIndexs, queryRounds++); QuerySeriesDataResponse response = (QuerySeriesDataResponse) ClusterRpcReaderUtils .handleQueryRequest(request, queryNodes.get(groupId), 0); - handleFetchDataResponseForSelectPaths(groupId, selectSeriesIndexs, response); + handleFetchDataResponseForSelectPaths(groupId, fetchDataSeriesIndexs, response); } @Override - public void fetchBatchDataForFilterPaths(String groupId) throws RaftConnectionException { + public void fetchBatchDataForAllFilterPaths(String groupId) throws RaftConnectionException { BasicRequest request = QuerySeriesDataRequest .createFetchDataRequest(groupId, taskId, PathType.FILTER_PATH, null, queryRounds++); QuerySeriesDataResponse response = (QuerySeriesDataResponse) ClusterRpcReaderUtils @@ -248,27 +245,22 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag throws RaftConnectionException { for (Entry<String, SelectSeriesGroupEntity> entry : selectSeriesGroupEntityMap.entrySet()) { String groupId = entry.getKey(); - List<String> fetchDataFilterSeries = new ArrayList<>(); - entry.getValue().getSelectPaths() - .forEach(path -> fetchDataFilterSeries.add(path.getFullPath())); BasicRequest request = QuerySeriesDataByTimestampRequest - .createRequest(groupId, queryRounds++, taskId, batchTimestamp, fetchDataFilterSeries); + .createRequest(groupId, queryRounds++, taskId, batchTimestamp); QuerySeriesDataByTimestampResponse response = (QuerySeriesDataByTimestampResponse) ClusterRpcReaderUtils .handleQueryRequest(request, queryNodes.get(groupId), 0); - handleFetchDataByTimestampResponseForSelectPaths(groupId, fetchDataFilterSeries, response); + handleFetchDataByTimestampResponseForSelectPaths(groupId, response); } } /** * Handle response of fetching data, and add batch data to corresponding reader. */ - private void handleFetchDataByTimestampResponseForSelectPaths(String groupId, - List<String> fetchDataSeries, - BasicQueryDataResponse response) { + private void handleFetchDataByTimestampResponseForSelectPaths(String groupId, BasicQueryDataResponse response) { List<BatchData> batchDataList = response.getSeriesBatchData(); List<ClusterSelectSeriesReader> selectSeriesReaders = selectSeriesGroupEntityMap.get(groupId) .getSelectSeriesReaders(); - for (int i = 0; i < fetchDataSeries.size(); i++) { + for (int i = 0; i < selectSeriesReaders.size(); i++) { BatchData batchData = batchDataList.get(i); selectSeriesReaders.get(i).addBatchData(batchData, true); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java index d6ca0d7..19d8f25 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java @@ -53,11 +53,11 @@ public interface IClusterRpcSingleQueryManager { void fetchBatchDataForSelectPaths(String groupId) throws RaftConnectionException; /** - * Fetch data for filter path. + * Fetch data for all filter paths. * * @param groupId data group id */ - void fetchBatchDataForFilterPaths(String groupId) throws RaftConnectionException; + void fetchBatchDataForAllFilterPaths(String groupId) throws RaftConnectionException; /** * Fetch batch data for all select paths by batch timestamp. If target data can be fetched, skip diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java index 9f35117..1de26bd 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java @@ -20,7 +20,6 @@ package org.apache.iotdb.cluster.query.manager.coordinatornode; import java.util.ArrayList; import java.util.List; -import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterFilterSeriesReader; import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.tsfile.read.common.Path; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java index 0f2cf62..8799be2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java @@ -30,10 +30,11 @@ import org.apache.iotdb.cluster.query.PathType; import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory; import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader; import org.apache.iotdb.cluster.query.reader.querynode.ClusterFillSelectSeriesBatchReader; -import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader; +import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReaderEntity; import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader; import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp; -import org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReader; +import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderEntity; +import org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReaderEntity; import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest; import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest; import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest; @@ -79,6 +80,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM private String groupId; /** + * Mark whether this manager has initialized or not. + */ + private boolean isInit = false; + + /** * Timer of Query, if the time is up, close query resource. */ private ScheduledFuture<?> queryTimer; @@ -94,14 +100,14 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM private long queryRound = -1; /** - * Key is series full path, value is reader of select series + * Select reader entity */ - private Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = new HashMap<>(); + private ClusterSelectSeriesBatchReaderEntity selectReaderEntity; /** - * Filter reader + * Filter reader entity */ - private IClusterFilterSeriesBatchReader filterReader; + private IClusterFilterSeriesBatchReaderEntity filterReaderEntity; /** * Key is series full path, value is data type of series @@ -127,11 +133,17 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM @Override public InitSeriesReaderResponse createSeriesReader(InitSeriesReaderRequest request) throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException { + if (isInit) { + throw new IOException(String + .format("ClusterLocalSingleQueryManager has already initialized. Job id = %s", jobId)); + } + isInit = true; this.groupId = request.getGroupID(); InitSeriesReaderResponse response = new InitSeriesReaderResponse(groupId); QueryContext context = new QueryContext(jobId); Map<PathType, QueryPlan> queryPlanMap = request.getAllQueryPlan(); if (queryPlanMap.containsKey(PathType.SELECT_PATH)) { + selectReaderEntity = new ClusterSelectSeriesBatchReaderEntity(); QueryPlan plan = queryPlanMap.get(PathType.SELECT_PATH); if (plan instanceof GroupByPlan) { throw new UnsupportedOperationException(); @@ -179,8 +191,9 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM fill.setDataType(dataType); fill.setQueryTime(fillQueryPlan.getQueryTime()); fill.constructReaders(queryDataSource, context); - selectSeriesReaders.put(path.getFullPath(), - new ClusterFillSelectSeriesBatchReader(dataType, fill.getFillResult())); + selectReaderEntity.addPath(path.getFullPath()); + selectReaderEntity + .addReaders(new ClusterFillSelectSeriesBatchReader(dataType, fill.getFillResult())); dataTypeMap.put(path.getFullPath(), dataType); } @@ -195,7 +208,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM */ private void handleAggreSeriesReader(QueryPlan queryPlan, QueryContext context, InitSeriesReaderResponse response) - throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException { + throws FileNodeManagerException, PathErrorException, IOException, ProcessorException { if (queryPlan.getExpression() == null || queryPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) { handleAggreSeriesReaderWithoutTimeGenerator(queryPlan, context, response); @@ -227,7 +240,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM for (int i = 0; i < selectedPaths.size(); i++) { Path path = selectedPaths.get(i); - selectSeriesReaders.put(path.getFullPath(), + selectReaderEntity.addPath(path.getFullPath()); + selectReaderEntity.addReaders( new ClusterSelectSeriesBatchReader(dataTypes.get(i), readers.get(i))); dataTypeMap.put(path.getFullPath(), dataTypes.get(i)); } @@ -275,8 +289,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM String fullPath = paths.get(i).getFullPath(); IPointReader reader = AbstractExecutorWithoutTimeGenerator .createSeriesReader(context, paths.get(i), dataTypes, timeFilter); - selectSeriesReaders - .put(fullPath, new ClusterSelectSeriesBatchReader(dataTypes.get(i), reader)); + selectReaderEntity.addPath(fullPath); + selectReaderEntity.addReaders(new ClusterSelectSeriesBatchReader(dataTypes.get(i), reader)); dataTypeMap.put(fullPath, dataTypes.get(i)); } response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes); @@ -298,7 +312,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM dataTypeMap.put(paths.get(i).getFullPath(), dataTypes.get(i)); } response.getSeriesDataTypes().put(pathType, dataTypes); - filterReader = new ClusterFilterSeriesBatchReader(queryDataSet, paths, request.getFilterList()); + filterReaderEntity = new ClusterFilterSeriesBatchReaderEntity(queryDataSet, paths, + request.getFilterList()); } /** @@ -318,9 +333,9 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM EngineReaderByTimeStamp readerByTimeStamp = ClusterSeriesReaderFactory .createReaderByTimeStamp(path, context); TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath()); - selectSeriesReaders - .put(path.getFullPath(), - new ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType)); + selectReaderEntity.addPath(path.getFullPath()); + selectReaderEntity + .addReaders(new ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType)); dataTypeMap.put(path.getFullPath(), dataType); dataTypeList.add(dataType); } @@ -336,10 +351,9 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM if (targetQueryRounds != this.queryRound) { this.queryRound = targetQueryRounds; PathType pathType = request.getPathType(); - List<String> paths = request.getSeriesPaths(); List<BatchData> batchDataList; if (pathType == PathType.SELECT_PATH) { - batchDataList = readSelectSeriesBatchData(paths); + batchDataList = readSelectSeriesBatchData(request.getSeriesPathIndexs()); } else { batchDataList = readFilterSeriesBatchData(); } @@ -355,13 +369,12 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM throws IOException { resetQueryTimer(); QuerySeriesDataByTimestampResponse response = new QuerySeriesDataByTimestampResponse(groupId); - List<String> fetchDataSeries = request.getFetchDataSeries(); long targetQueryRounds = request.getQueryRounds(); if (targetQueryRounds != this.queryRound) { this.queryRound = targetQueryRounds; + List<AbstractClusterSelectSeriesBatchReader> readers = selectReaderEntity.getAllReaders(); List<BatchData> batchDataList = new ArrayList<>(); - for (String series : fetchDataSeries) { - AbstractClusterSelectSeriesBatchReader reader = selectSeriesReaders.get(series); + for (AbstractClusterSelectSeriesBatchReader reader : readers) { batchDataList.add(reader.nextBatch(request.getBatchTimestamp())); } cachedBatchDataResult = batchDataList; @@ -378,14 +391,15 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM } /** - * Read batch data of select series + * Read batch data of select series by series index * - * @param paths all series to query + * @param seriesIndexs all series index to query */ - private List<BatchData> readSelectSeriesBatchData(List<String> paths) throws IOException { + private List<BatchData> readSelectSeriesBatchData(List<Integer> seriesIndexs) throws IOException { List<BatchData> batchDataList = new ArrayList<>(); - for (String fullPath : paths) { - batchDataList.add(selectSeriesReaders.get(fullPath).nextBatch()); + for (int index : seriesIndexs) { + AbstractClusterSelectSeriesBatchReader reader = selectReaderEntity.getReaderByIndex(index); + batchDataList.add(reader.nextBatch()); } return batchDataList; } @@ -396,7 +410,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM * @return batch data of all filter series */ private List<BatchData> readFilterSeriesBatchData() throws IOException { - return filterReader.nextBatchList(); + return filterReaderEntity.nextBatchList(); } public String getGroupId() { @@ -417,12 +431,12 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM return queryRound; } - public Map<String, AbstractClusterSelectSeriesBatchReader> getSelectSeriesReaders() { - return selectSeriesReaders; + public ClusterSelectSeriesBatchReaderEntity getSelectReaderEntity() { + return selectReaderEntity; } - public IClusterFilterSeriesBatchReader getFilterReader() { - return filterReader; + public IClusterFilterSeriesBatchReaderEntity getFilterReaderEntity() { + return filterReaderEntity; } public Map<String, TSDataType> getDataTypeMap() { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java index 0c0287e..9d60ae2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java @@ -83,7 +83,7 @@ public class ClusterFilterSeriesReader extends AbstractClusterPointReader { @Override protected void updateCurrentBatchData() throws RaftConnectionException { if (batchDataList.isEmpty() && !remoteDataFinish) { - queryManager.fetchBatchDataForFilterPaths(groupId); + queryManager.fetchBatchDataForAllFilterPaths(groupId); } if (!batchDataList.isEmpty()) { currentBatchData = batchDataList.removeFirst(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java index 55639a1..fadd92f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java @@ -19,7 +19,7 @@ package org.apache.iotdb.cluster.query.reader.querynode; import java.io.IOException; -import org.apache.iotdb.cluster.query.manager.common.FillBatchData; +import org.apache.iotdb.cluster.query.common.FillBatchData; import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.BatchData; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java similarity index 93% rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java index 1cd357e..65f8c1c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java @@ -32,9 +32,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; /** - * Batch reader for all filter paths. + * Batch reader entity for all filter paths. */ -public class ClusterFilterSeriesBatchReader implements IClusterFilterSeriesBatchReader { +public class ClusterFilterSeriesBatchReaderEntity implements IClusterFilterSeriesBatchReaderEntity { private List<Path> allFilterPath; @@ -44,7 +44,7 @@ public class ClusterFilterSeriesBatchReader implements IClusterFilterSeriesBatch private static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig(); - public ClusterFilterSeriesBatchReader(QueryDataSet queryDataSet, List<Path> allFilterPath, + public ClusterFilterSeriesBatchReaderEntity(QueryDataSet queryDataSet, List<Path> allFilterPath, List<Filter> filters) { this.queryDataSet = queryDataSet; this.allFilterPath = allFilterPath; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java similarity index 52% copy from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java copy to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java index 218d68b..f0dea38 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java @@ -18,19 +18,45 @@ */ package org.apache.iotdb.cluster.query.reader.querynode; -import java.io.IOException; +import java.util.ArrayList; import java.util.List; -import org.apache.iotdb.tsfile.read.common.BatchData; /** - * Batch reader for filter series which is used in query node. + * Batch reader entity for all select paths. */ -public interface IClusterFilterSeriesBatchReader { - - boolean hasNext() throws IOException; +public class ClusterSelectSeriesBatchReaderEntity { + /** + * All select paths + */ + List<String> paths; /** - * Get next batch data of all filter series. + * All select readers */ - List<BatchData> nextBatchList() throws IOException; + List<AbstractClusterSelectSeriesBatchReader> readers; + + public ClusterSelectSeriesBatchReaderEntity() { + paths = new ArrayList<>(); + readers = new ArrayList<>(); + } + + public void addPath(String path) { + this.paths.add(path); + } + + public void addReaders(AbstractClusterSelectSeriesBatchReader reader) { + this.readers.add(reader); + } + + public List<AbstractClusterSelectSeriesBatchReader> getAllReaders() { + return readers; + } + + public AbstractClusterSelectSeriesBatchReader getReaderByIndex(int index){ + return readers.get(index); + } + + public List<String> getAllPaths() { + return paths; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java similarity index 95% rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java index 218d68b..a045e2a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java @@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData; /** * Batch reader for filter series which is used in query node. */ -public interface IClusterFilterSeriesBatchReader { +public interface IClusterFilterSeriesBatchReaderEntity { boolean hasNext() throws IOException; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java index 0f05cf2..7525368 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java @@ -18,7 +18,7 @@ */ package org.apache.iotdb.cluster.query.utils; -import org.apache.iotdb.cluster.query.manager.common.FillBatchData; +import org.apache.iotdb.cluster.query.common.FillBatchData; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.db.utils.TimeValuePairUtils; import org.apache.iotdb.tsfile.read.common.BatchData; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataByTimestampRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataByTimestampRequest.java index 351e6eb..cbcef15 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataByTimestampRequest.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataByTimestampRequest.java @@ -39,21 +39,16 @@ public class QuerySeriesDataByTimestampRequest extends BasicQueryRequest { */ private List<Long> batchTimestamp; - /** - * Series to fetch data from remote query node - */ - private List<String> fetchDataSeries; - private QuerySeriesDataByTimestampRequest(String groupID) { super(groupID); } - public static QuerySeriesDataByTimestampRequest createRequest(String groupId, long queryRounds, String taskId, List<Long> batchTimestamp, List<String> fetchDataSeries){ + public static QuerySeriesDataByTimestampRequest createRequest(String groupId, long queryRounds, + String taskId, List<Long> batchTimestamp) { QuerySeriesDataByTimestampRequest request = new QuerySeriesDataByTimestampRequest(groupId); request.queryRounds = queryRounds; request.taskId = taskId; request.batchTimestamp = batchTimestamp; - request.fetchDataSeries = fetchDataSeries; return request; } @@ -80,12 +75,4 @@ public class QuerySeriesDataByTimestampRequest extends BasicQueryRequest { public void setBatchTimestamp(List<Long> batchTimestamp) { this.batchTimestamp = batchTimestamp; } - - public List<String> getFetchDataSeries() { - return fetchDataSeries; - } - - public void setFetchDataSeries(List<String> fetchDataSeries) { - this.fetchDataSeries = fetchDataSeries; - } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java index 554b8c1..e0fc23c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java @@ -46,9 +46,9 @@ public class QuerySeriesDataRequest extends BasicQueryRequest { private PathType pathType; /** - * Key is series type, value is series list + * list of series path index. */ - private List<String> seriesPaths = new ArrayList<>(); + private List<Integer> seriesPathIndexs = new ArrayList<>(); private QuerySeriesDataRequest(String groupID, String taskId) { super(groupID); @@ -56,10 +56,10 @@ public class QuerySeriesDataRequest extends BasicQueryRequest { } public static QuerySeriesDataRequest createFetchDataRequest(String groupId, String taskId, - PathType pathType, List<String> seriesPaths, long queryRounds) { + PathType pathType, List<Integer> seriesPathIndexs, long queryRounds) { QuerySeriesDataRequest request = new QuerySeriesDataRequest(groupId, taskId); request.pathType = pathType; - request.seriesPaths = seriesPaths; + request.seriesPathIndexs = seriesPathIndexs; request.queryRounds = queryRounds; return request; } @@ -88,11 +88,7 @@ public class QuerySeriesDataRequest extends BasicQueryRequest { this.pathType = pathType; } - public List<String> getSeriesPaths() { - return seriesPaths; - } - - public void setSeriesPaths(List<String> seriesPaths) { - this.seriesPaths = seriesPaths; + public List<Integer> getSeriesPathIndexs() { + return seriesPathIndexs; } } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java index 494029c..45ab923 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java @@ -163,7 +163,6 @@ public class IoTDBAggregationLargeDataIT { } @Test - @Ignore public void remoteTest() throws ClassNotFoundException, SQLException { QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); insertSQL(); @@ -183,7 +182,7 @@ public class IoTDBAggregationLargeDataIT { maxTimeAggreWithMultiFilterTest(); minValueAggreWithMultiFilterTest(); maxValueAggreWithMultiFilterTest(); -// meanAggreWithMultiFilterTest(); + meanAggreWithMultiFilterTest(); sumAggreWithMultiFilterTest(); firstAggreWithMultiFilterTest(); } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java index 02dc01f..162c5ac 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java @@ -181,13 +181,6 @@ public class IoTDBAggregationSmallDataIT { } @Test - @Ignore - public void countOnlyTimeFilterRemoteTest() throws ClassNotFoundException, SQLException { - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); - countOnlyTimeFilterTest(); - } - - @Test public void functionsNoFilterTest() throws ClassNotFoundException, SQLException { String[] retArray = new String[]{ "0,4,0,6,1", @@ -286,13 +279,6 @@ public class IoTDBAggregationSmallDataIT { } @Test - @Ignore - public void functionsNoFilterRemoteTest() throws ClassNotFoundException, SQLException { - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); - functionsNoFilterTest(); - } - - @Test public void lastAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException { String[] retArray = new String[]{ "0,22222,55555" @@ -322,13 +308,6 @@ public class IoTDBAggregationSmallDataIT { } @Test - @Ignore - public void lastAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException { - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); - lastAggreWithSingleFilterTest(); - } - - @Test public void firstAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException { String[] retArray = new String[]{ "0,99,180" @@ -358,13 +337,6 @@ public class IoTDBAggregationSmallDataIT { } @Test - @Ignore - public void firstAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException { - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); - firstAggreWithSingleFilterTest(); - } - - @Test public void sumAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException { String[] retArray = new String[]{ "0,22321.0,55934.0,1029" @@ -394,13 +366,6 @@ public class IoTDBAggregationSmallDataIT { } @Test - @Ignore - public void sumAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException { - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); - sumAggreWithSingleFilterTest(); - } - - @Test public void meanAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException { String[] retArray = new String[]{ "0,11160.5,18645,206" @@ -430,13 +395,6 @@ public class IoTDBAggregationSmallDataIT { } @Test - @Ignore - public void meanAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException { - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); - meanAggreWithSingleFilterTest(); - } - - @Test public void countAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException { String[] retArray = new String[]{ "0,2,3,5,1,0" @@ -468,13 +426,6 @@ public class IoTDBAggregationSmallDataIT { } @Test - @Ignore - public void countAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException { - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); - countAggreWithSingleFilterTest(); - } - - @Test public void minTimeAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException { String[] retArray = new String[]{ "0,104,1,2,101,100" @@ -507,13 +458,6 @@ public class IoTDBAggregationSmallDataIT { } @Test - @Ignore - public void minTimeAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException { - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); - minTimeAggreWithSingleFilterTest(); - } - - @Test public void maxTimeAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException { String[] retArray = new String[]{ "0,105,105,105,102,100" @@ -546,13 +490,6 @@ public class IoTDBAggregationSmallDataIT { } @Test - @Ignore - public void maxTimeAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException { - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); - maxTimeAggreWithSingleFilterTest(); - } - - @Test public void minValueAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException { String[] retArray = new String[]{ "0,90,180,2.22,ddddd,true" @@ -588,14 +525,6 @@ public class IoTDBAggregationSmallDataIT { } @Test - @Ignore - public void minValueAggreWithSingleFilterRemoteTest() - throws ClassNotFoundException, SQLException { - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); - minValueAggreWithSingleFilterTest(); - } - - @Test public void maxValueAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException { String[] retArray = new String[]{ "0,99,50000,11.11,fffff,true" @@ -630,14 +559,6 @@ public class IoTDBAggregationSmallDataIT { } @Test - @Ignore - public void maxValueAggreWithSingleFilterRemoteTest() - throws ClassNotFoundException, SQLException { - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); - maxValueAggreWithSingleFilterTest(); - } - - @Test public void countAggreWithMultiMultiFilterTest() { String[] retArray = new String[]{ "0,2", @@ -667,14 +588,6 @@ public class IoTDBAggregationSmallDataIT { } @Test - @Ignore - public void countAggreWithMultiMultiFilterRemoteTest() - throws ClassNotFoundException, SQLException { - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); - countAggreWithMultiMultiFilterTest(); - } - - @Test public void selectAllSQLTest() throws ClassNotFoundException, SQLException { //d0s0,d0s1,d0s2,d0s3,d1s0 String[] retArray = new String[]{ @@ -739,13 +652,6 @@ public class IoTDBAggregationSmallDataIT { } } - @Test - @Ignore - public void selectAllSQLRemoteTest() throws ClassNotFoundException, SQLException { - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); - selectAllSQLTest(); - } - private static void insertSQL() { try (Connection connection = DriverManager.getConnection (Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")) { diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java index b822831..e71e489 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java @@ -42,7 +42,8 @@ import org.apache.iotdb.cluster.query.manager.querynode.ClusterLocalSingleQueryM import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader; import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader; import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp; -import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader; +import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReaderEntity; +import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderEntity; import org.apache.iotdb.cluster.utils.EnvironmentUtils; import org.apache.iotdb.cluster.utils.QPExecutorUtils; import org.apache.iotdb.cluster.utils.hash.PhysicalNode; @@ -219,15 +220,15 @@ public class ClusterLocalManagerTest { assertNotNull(singleQueryManager); assertEquals((long) map.get(taskId), singleQueryManager.getJobId()); assertEquals(0, singleQueryManager.getQueryRound()); - assertNull(singleQueryManager.getFilterReader()); - Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager - .getSelectSeriesReaders(); - assertEquals(3, selectSeriesReaders.size()); + assertNull(singleQueryManager.getFilterReaderEntity()); + ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity(); + assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size()); Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap(); - for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) { - String path = entry.getKey(); - TSDataType dataType = typeMap.get(path); - AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue(); + List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders(); + List<String> paths = selectSeriesBatchReaderEntity.getAllPaths(); + for (int i =0 ; i < readers.size(); i++) { + TSDataType dataType = typeMap.get(paths.get(i)); + AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i); assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader()); assertEquals(dataType, ((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType()); @@ -246,15 +247,15 @@ public class ClusterLocalManagerTest { assertNotNull(singleQueryManager); assertEquals((long) map.get(taskId), singleQueryManager.getJobId()); assertEquals(0, singleQueryManager.getQueryRound()); - assertNull(singleQueryManager.getFilterReader()); - Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager - .getSelectSeriesReaders(); - assertEquals(3, selectSeriesReaders.size()); + assertNull(singleQueryManager.getFilterReaderEntity()); + ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity(); + assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size()); Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap(); - for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) { - String path = entry.getKey(); - TSDataType dataType = typeMap.get(path); - AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue(); + List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders(); + List<String> paths = selectSeriesBatchReaderEntity.getAllPaths(); + for (int i =0 ; i < readers.size(); i++) { + TSDataType dataType = typeMap.get(paths.get(i)); + AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i); assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader()); assertEquals(dataType, ((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType()); @@ -273,15 +274,15 @@ public class ClusterLocalManagerTest { assertNotNull(singleQueryManager); assertEquals((long) map.get(taskId), singleQueryManager.getJobId()); assertEquals(0, singleQueryManager.getQueryRound()); - assertNull(singleQueryManager.getFilterReader()); - Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager - .getSelectSeriesReaders(); - assertEquals(3, selectSeriesReaders.size()); + assertNull(singleQueryManager.getFilterReaderEntity()); + ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity(); + assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size()); Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap(); - for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) { - String path = entry.getKey(); - TSDataType dataType = typeMap.get(path); - AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue(); + List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders(); + List<String> paths = selectSeriesBatchReaderEntity.getAllPaths(); + for (int i =0 ; i < readers.size(); i++) { + TSDataType dataType = typeMap.get(paths.get(i)); + AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i); assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader()); assertEquals(dataType, ((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType()); @@ -310,22 +311,22 @@ public class ClusterLocalManagerTest { assertNotNull(singleQueryManager); assertEquals((long) map.get(taskId), singleQueryManager.getJobId()); assertEquals(3, singleQueryManager.getQueryRound()); - ClusterFilterSeriesBatchReader filterReader = (ClusterFilterSeriesBatchReader) singleQueryManager.getFilterReader(); + ClusterFilterSeriesBatchReaderEntity filterReader = (ClusterFilterSeriesBatchReaderEntity) singleQueryManager.getFilterReaderEntity(); assertNotNull(filterReader); List<Path> allFilterPaths = new ArrayList<>(); allFilterPaths.add(new Path("root.vehicle.d0.s0")); assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath())); assertNotNull(filterReader.getQueryDataSet()); - Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager - .getSelectSeriesReaders(); - assertNotNull(selectSeriesReaders); - assertEquals(3, selectSeriesReaders.size()); + ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity(); + assertNotNull(selectSeriesBatchReaderEntity.getAllReaders()); + assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size()); Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap(); - for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) { - String path = entry.getKey(); - TSDataType dataType = typeMap.get(path); - AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue(); + List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders(); + List<String> paths = selectSeriesBatchReaderEntity.getAllPaths(); + for (int i =0 ; i < readers.size(); i++) { + TSDataType dataType = typeMap.get(paths.get(i)); + AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i); assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp()); assertEquals(dataType, ((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType()); @@ -344,22 +345,22 @@ public class ClusterLocalManagerTest { assertNotNull(singleQueryManager); assertEquals((long) map.get(taskId), singleQueryManager.getJobId()); assertEquals(3, singleQueryManager.getQueryRound()); - ClusterFilterSeriesBatchReader filterReader = (ClusterFilterSeriesBatchReader) singleQueryManager.getFilterReader(); + ClusterFilterSeriesBatchReaderEntity filterReader = (ClusterFilterSeriesBatchReaderEntity) singleQueryManager.getFilterReaderEntity(); assertNotNull(filterReader); List<Path> allFilterPaths = new ArrayList<>(); allFilterPaths.add(new Path("root.vehicle.d0.s0")); assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath())); assertNotNull(filterReader.getQueryDataSet()); - Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager - .getSelectSeriesReaders(); - assertNotNull(selectSeriesReaders); - assertEquals(3, selectSeriesReaders.size()); + ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity(); + List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders(); + assertNotNull(readers); + assertEquals(3, readers.size()); Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap(); - for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) { - String path = entry.getKey(); - TSDataType dataType = typeMap.get(path); - AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue(); + List<String> paths = selectSeriesBatchReaderEntity.getAllPaths(); + for (int i =0 ; i < readers.size(); i++) { + TSDataType dataType = typeMap.get(paths.get(i)); + AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i); assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp()); assertEquals(dataType, ((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType()); @@ -378,22 +379,22 @@ public class ClusterLocalManagerTest { assertNotNull(singleQueryManager); assertEquals((long) map.get(taskId), singleQueryManager.getJobId()); assertEquals(3, singleQueryManager.getQueryRound()); - ClusterFilterSeriesBatchReader filterReader = (ClusterFilterSeriesBatchReader) singleQueryManager.getFilterReader(); + ClusterFilterSeriesBatchReaderEntity filterReader = (ClusterFilterSeriesBatchReaderEntity) singleQueryManager.getFilterReaderEntity(); assertNotNull(filterReader); List<Path> allFilterPaths = new ArrayList<>(); allFilterPaths.add(new Path("root.vehicle.d0.s0")); assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath())); assertNotNull(filterReader.getQueryDataSet()); - Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager - .getSelectSeriesReaders(); - assertNotNull(selectSeriesReaders); - assertEquals(3, selectSeriesReaders.size()); + ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity(); + List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders(); + assertNotNull(readers); + assertEquals(3, readers.size()); Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap(); - for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) { - String path = entry.getKey(); - TSDataType dataType = typeMap.get(path); - AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue(); + List<String> paths = selectSeriesBatchReaderEntity.getAllPaths(); + for (int i =0 ; i < readers.size(); i++) { + TSDataType dataType = typeMap.get(paths.get(i)); + AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i); assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp()); assertEquals(dataType, ((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType());
