This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit aa3aa0d943626e3864753dd93ff4585e6f60e3eb Author: lta <[email protected]> AuthorDate: Fri May 17 15:08:50 2019 +0800 fix a serve bug of filter serializable --- .../executor/ClusterAggregateEngineExecutor.java | 140 ++++- .../ClusterRpcSingleQueryManager.java | 4 +- .../querynode/ClusterLocalQueryManager.java | 2 +- .../querynode/ClusterLocalSingleQueryManager.java | 25 +- .../querynode/IClusterLocalQueryManager.java | 4 +- .../querynode/IClusterLocalSingleQueryManager.java | 5 +- .../querynode/ClusterFilterSeriesBatchReader.java | 8 +- .../cluster/query/utils/ClusterRpcReaderUtils.java | 28 +- .../query/utils/QueryPlanPartitionUtils.java | 2 +- .../request/querydata/InitSeriesReaderRequest.java | 72 ++- .../apache/iotdb/cluster/integration/Constant.java | 100 ++++ .../cluster/integration/IoTDBAggregationIT.java | 640 +++++++++++++++++++++ .../cluster/integration/IoTDBFillQueryIT.java | 1 - .../db/query/executor/AggregateEngineExecutor.java | 14 +- .../timegenerator/AbstractNodeConstructor.java | 3 - 15 files changed, 996 insertions(+), 52 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java index b63b311..51113c9 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java @@ -20,10 +20,18 @@ package org.apache.iotdb.cluster.query.executor; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.iotdb.cluster.config.ClusterConfig; +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.exception.RaftConnectionException; +import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; +import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity; import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader; +import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; @@ -41,6 +49,7 @@ import org.apache.iotdb.db.query.executor.AggregateEngineExecutor; import org.apache.iotdb.db.query.factory.AggreFuncFactory; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader; import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -49,10 +58,16 @@ import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; +/** + * Handle aggregation query and construct dataset in cluster + */ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor { private ClusterRpcSingleQueryManager queryManager; + private static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig(); + public ClusterAggregateEngineExecutor(List<Path> selectedSeries, List<String> aggres, IExpression expression, ClusterRpcSingleQueryManager queryManager) { @@ -80,7 +95,7 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor { paths.add(path); // construct AggregateFunction TSDataType tsDataType = MManager.getInstance() - .getSeriesType(selectedSeries.get(i).getFullPath()); + .getSeriesType(path.getFullPath()); AggregateFunction function = AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType); function.init(); @@ -103,6 +118,8 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor { AggreResultData aggreResultData = aggregateWithoutTimeGenerator(function, sequenceReader, unSeqMergeReader, timeFilter); + + dataTypes.add(aggreResultData.getDataType()); readers.add(new AggreResultDataPointReader(aggreResultData)); } } @@ -111,4 +128,125 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor { return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, readers); } + + /** + * execute aggregate function with value filter. + * + * @param context query context. + */ + @Override + public QueryDataSet executeWithTimeGenerator(QueryContext context) + throws FileNodeManagerException, PathErrorException, IOException, ProcessorException { + + /** add query token for query series which can handle locally **/ + List<Path> localQuerySeries = new ArrayList<>(selectedSeries); + Set<Path> remoteQuerySeries = queryManager.getSelectSeriesReaders().keySet(); + localQuerySeries.removeAll(remoteQuerySeries); + QueryResourceManager.getInstance() + .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries); + + /** add query token for filter series which can handle locally **/ + Set<String> deviceIdSet = new HashSet<>(); + for (FilterGroupEntity filterGroupEntity : queryManager.getFilterGroupEntityMap().values()) { + List<Path> remoteFilterSeries = filterGroupEntity.getFilterPaths(); + remoteFilterSeries.forEach(seriesPath -> deviceIdSet.add(seriesPath.getDevice())); + } + QueryResourceManager.getInstance() + .beginQueryOfGivenExpression(context.getJobId(), expression, deviceIdSet); + + ClusterTimeGenerator timestampGenerator; + List<EngineReaderByTimeStamp> readersOfSelectedSeries; + try { + timestampGenerator = new ClusterTimeGenerator(expression, context, + queryManager); + readersOfSelectedSeries = ClusterSeriesReaderFactory + .createReadersByTimestampOfSelectedPaths(selectedSeries, context, + queryManager); + } catch (IOException ex) { + throw new FileNodeManagerException(ex); + } + + /** Get data type of select paths **/ + List<TSDataType> originDataTypes = new ArrayList<>(); + Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = queryManager + .getSelectSeriesReaders(); + for (Path path : selectedSeries) { + try { + if (selectSeriesReaders.containsKey(path)) { + originDataTypes.add(selectSeriesReaders.get(path).getDataType()); + } else { + originDataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath())); + } + } catch (PathErrorException e) { + throw new FileNodeManagerException(e); + } + } + + List<AggregateFunction> aggregateFunctions = new ArrayList<>(); + for (int i = 0; i < selectedSeries.size(); i++) { + TSDataType type = originDataTypes.get(i); + AggregateFunction function = AggreFuncFactory.getAggrFuncByName(aggres.get(i), type); + function.init(); + aggregateFunctions.add(function); + } + List<AggreResultData> aggreResultDataList = aggregateWithTimeGenerator(aggregateFunctions, + timestampGenerator, + readersOfSelectedSeries); + + List<IPointReader> resultDataPointReaders = new ArrayList<>(); + List<TSDataType> dataTypes = new ArrayList<>(); + for (AggreResultData resultData : aggreResultDataList) { + dataTypes.add(resultData.getDataType()); + resultDataPointReaders.add(new AggreResultDataPointReader(resultData)); + } + return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, resultDataPointReaders); + } + + /** + * calculation aggregate result with value filter. + */ + @Override + protected List<AggreResultData> aggregateWithTimeGenerator( + List<AggregateFunction> aggregateFunctions, + TimeGenerator timestampGenerator, + List<EngineReaderByTimeStamp> readersOfSelectedSeries) + throws IOException { + + while (timestampGenerator.hasNext()) { + + // generate timestamps for aggregate + long[] timeArray = new long[aggregateFetchSize]; + List<Long> batchTimestamp = new ArrayList<>(); + int timeArrayLength = 0; + for (int cnt = 0; cnt < aggregateFetchSize; cnt++) { + if (!timestampGenerator.hasNext()) { + break; + } + long time = timestampGenerator.next(); + timeArray[timeArrayLength++] = time; + batchTimestamp.add(time); + } + + // fetch all remote select series data by timestamp list. + if (!batchTimestamp.isEmpty()) { + try { + queryManager.fetchBatchDataByTimestampForAllSelectPaths(batchTimestamp); + } catch (RaftConnectionException e) { + throw new IOException(e); + } + } + + // cal part of aggregate result + for (int i = 0; i < readersOfSelectedSeries.size(); i++) { + aggregateFunctions.get(i).calcAggregationUsingTimestamps(timeArray, timeArrayLength, + readersOfSelectedSeries.get(i)); + } + } + + List<AggreResultData> aggreResultDataArrayList = new ArrayList<>(); + for (AggregateFunction function : aggregateFunctions) { + aggreResultDataArrayList.add(function.getResult()); + } + return aggreResultDataArrayList; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java index faeda22..6c4f2ad 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java @@ -134,7 +134,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag * group */ private void initSeriesReader(int readDataConsistencyLevel) - throws RaftConnectionException { + throws RaftConnectionException, IOException { // Init all series with data group of select series,if filter series has the same data group, init them together. for (Entry<String, QueryPlan> entry : selectPathPlans.entrySet()) { String groupId = entry.getKey(); @@ -144,7 +144,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag queryNodes.put(groupId, randomPeer); Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class); allQueryPlan.put(PathType.SELECT_PATH, queryPlan); - List<Filter> filterList = null; + List<Filter> filterList = new ArrayList<>(); if (filterGroupEntityMap.containsKey(groupId)) { FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId); allQueryPlan.put(PathType.FILTER_PATH, filterGroupEntity.getQueryPlan()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java index fe3ac52..4e09af8 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java @@ -53,7 +53,7 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager { @Override public InitSeriesReaderResponse createQueryDataSet(InitSeriesReaderRequest request) - throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException { + throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException { long jobId = QueryResourceManager.getInstance().assignJobId(); String taskId = request.getTaskId(); TASK_ID_MAP_JOB_ID.put(taskId, jobId); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java index 76a141e..0f2cf62 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java @@ -64,9 +64,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.expression.ExpressionType; -import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; -import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.slf4j.Logger; @@ -75,7 +73,8 @@ import org.slf4j.LoggerFactory; public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterLocalSingleQueryManager.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(ClusterLocalSingleQueryManager.class); private String groupId; @@ -127,7 +126,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM @Override public InitSeriesReaderResponse createSeriesReader(InitSeriesReaderRequest request) - throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException { + throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException { this.groupId = request.getGroupID(); InitSeriesReaderResponse response = new InitSeriesReaderResponse(groupId); QueryContext context = new QueryContext(jobId); @@ -199,7 +198,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException { if (queryPlan.getExpression() == null || queryPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) { - handleAggreSeriesReaderWithoutTimeGenerator(queryPlan,context,response); + handleAggreSeriesReaderWithoutTimeGenerator(queryPlan, context, response); } else { handleSelectReaderWithTimeGenerator(queryPlan, context, response); } @@ -210,24 +209,23 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM * * @param queryPlan fill query plan */ - private void handleAggreSeriesReaderWithoutTimeGenerator(QueryPlan queryPlan, QueryContext context, + private void handleAggreSeriesReaderWithoutTimeGenerator(QueryPlan queryPlan, + QueryContext context, InitSeriesReaderResponse response) - throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException { + throws FileNodeManagerException, PathErrorException, IOException, ProcessorException { AggregationPlan fillQueryPlan = (AggregationPlan) queryPlan; List<Path> selectedPaths = fillQueryPlan.getPaths(); QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedPaths); - IExpression optimizedExpression = ExpressionOptimizer.getInstance() - .optimize(fillQueryPlan.getExpression(), selectedPaths); AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor( - selectedPaths, fillQueryPlan.getAggregations(), optimizedExpression); + selectedPaths, fillQueryPlan.getAggregations(), fillQueryPlan.getExpression()); List<IPointReader> readers = engineExecutor.constructAggreReadersWithoutTimeGenerator(context); List<TSDataType> dataTypes = engineExecutor.getDataTypes(); - for (int i =0 ; i < selectedPaths.size(); i ++) { + for (int i = 0; i < selectedPaths.size(); i++) { Path path = selectedPaths.get(i); selectSeriesReaders.put(path.getFullPath(), new ClusterSelectSeriesBatchReader(dataTypes.get(i), readers.get(i))); @@ -291,7 +289,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM */ private void handleFilterSeriesReader(QueryPlan plan, QueryContext context, InitSeriesReaderRequest request, InitSeriesReaderResponse response, PathType pathType) - throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException, ProcessorException, IOException { + throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException, ProcessorException, IOException, ClassNotFoundException { QueryDataSet queryDataSet = queryProcessExecutor .processQuery(plan, context); List<Path> paths = plan.getPaths(); @@ -321,7 +319,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM .createReaderByTimeStamp(path, context); TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath()); selectSeriesReaders - .put(path.getFullPath(), new ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType)); + .put(path.getFullPath(), + new ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType)); dataTypeMap.put(path.getFullPath(), dataType); dataTypeList.add(dataType); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java index cc0f103..1105bb2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java @@ -42,7 +42,7 @@ public interface IClusterLocalQueryManager { * @param request request for query data from coordinator node */ InitSeriesReaderResponse createQueryDataSet(InitSeriesReaderRequest request) - throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException; + throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException; /** * Read batch data of all querying series in request and set response. @@ -54,8 +54,8 @@ public interface IClusterLocalQueryManager { /** * Read batch data of select series by batch timestamp which is used in query with value filter - * @param request request of querying select paths * + * @param request request of querying select paths */ QuerySeriesDataByTimestampResponse readBatchDataByTimestamp( QuerySeriesDataByTimestampRequest request) throws IOException; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java index 318772f..1d89c5c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java @@ -40,18 +40,19 @@ public interface IClusterLocalSingleQueryManager { /** * Initially create corresponding series readers. + * * @param request request of querying series data */ InitSeriesReaderResponse createSeriesReader(InitSeriesReaderRequest request) - throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException; + throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException; /** * <p> * Read batch data If query round in cache is equal to target query round, it means that batch * data in query node transfer to coordinator fail and return cached batch data. * </p> - * @param request request of querying series data * + * @param request request of querying series data */ QuerySeriesDataResponse readBatchData(QuerySeriesDataRequest request) throws IOException; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java index 3f21835..1cd357e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java @@ -68,12 +68,12 @@ public class ClusterFilterSeriesBatchReader implements IClusterFilterSeriesBatch batchDataList.add(new BatchData(dataTypeList.get(i), true)); } int dataPointCount = 0; - while(true){ - if(!hasNext() || dataPointCount == CLUSTER_CONF.getBatchReadSize()){ + while (true) { + if (!hasNext() || dataPointCount == CLUSTER_CONF.getBatchReadSize()) { break; } - if(hasNext() && addTimeValuePair(batchDataList, dataTypeList)){ - dataPointCount++; + if (hasNext() && addTimeValuePair(batchDataList, dataTypeList)) { + dataPointCount++; } } return batchDataList; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java index 424ba95..dca2d30 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java @@ -19,6 +19,7 @@ package org.apache.iotdb.cluster.query.utils; import com.alipay.sofa.jraft.entity.PeerId; +import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.iotdb.cluster.config.ClusterDescriptor; @@ -62,7 +63,7 @@ public class ClusterRpcReaderUtils { */ public static BasicResponse createClusterSeriesReader(String groupId, PeerId peerId, int readDataConsistencyLevel, Map<PathType, QueryPlan> allQueryPlan, String taskId, - List<Filter> filterList) throws RaftConnectionException { + List<Filter> filterList) throws RaftConnectionException, IOException { /** handle request **/ BasicRequest request = InitSeriesReaderRequest @@ -71,14 +72,35 @@ public class ClusterRpcReaderUtils { return handleQueryRequest(request, peerId, 0); } - public static QuerySeriesDataResponse fetchBatchData(String groupID, PeerId peerId, String taskId, + /** + * Fetch batch data for select series in a query without value filter or filter series. + * + * @param groupId data group id + * @param peerId query node id + * @param taskId task id of query task + * @param pathType type of path + * @param fetchDataSeries series list which need to fetch data + * @param queryRounds query rounds + */ + public static QuerySeriesDataResponse fetchBatchData(String groupId, PeerId peerId, String taskId, PathType pathType, List<String> fetchDataSeries, long queryRounds) throws RaftConnectionException { BasicRequest request = QuerySeriesDataRequest - .createFetchDataRequest(groupID, taskId, pathType, fetchDataSeries, queryRounds); + .createFetchDataRequest(groupId, taskId, pathType, fetchDataSeries, queryRounds); return (QuerySeriesDataResponse) handleQueryRequest(request, peerId, 0); } + /** + * Fetch batch data corresponding to a given list of timestamp for select series in a query with + * value filter. + * + * @param groupId data group id + * @param peerId query node id + * @param taskId task id of query task + * @param queryRounds query rounds + * @param batchTimestamp list of valid timestamp + * @param fetchDataSeries series list which need to fetch data + */ public static QuerySeriesDataByTimestampResponse fetchBatchDataByTimestamp(String groupId, PeerId peerId, String taskId, long queryRounds, List<Long> batchTimestamp, List<String> fetchDataSeries) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java index fc0d401..5fbd30c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java @@ -156,7 +156,7 @@ public class QueryPlanPartitionUtils { throws PathErrorException { AggregationPlan queryPlan = (AggregationPlan) singleQueryManager.getOriginQueryPlan(); List<Path> selectPaths = queryPlan.getPaths(); - List<String> aggregations = new ArrayList<>(); + List<String> aggregations = queryPlan.getAggregations(); Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId(); Map<String, List<String>> selectAggregationByGroupId = new HashMap<>(); Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/InitSeriesReaderRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/InitSeriesReaderRequest.java index c974e2f..e28ac15 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/InitSeriesReaderRequest.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/InitSeriesReaderRequest.java @@ -18,10 +18,16 @@ */ package org.apache.iotdb.cluster.rpc.raft.request.querydata; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.iotdb.cluster.query.PathType; import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; @@ -42,12 +48,12 @@ public class InitSeriesReaderRequest extends BasicQueryRequest { /** * Key is series type, value is query plan */ - private Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class); + private Map<PathType, byte[]> allQueryPlan = new EnumMap<>(PathType.class); /** * Represent all filter of leaf node in filter tree while executing a query with value filter. */ - private List<Filter> filterList = new ArrayList<>(); + private List<byte[]> filterList = new ArrayList<>(); private InitSeriesReaderRequest(String groupID, String taskId) { @@ -55,12 +61,17 @@ public class InitSeriesReaderRequest extends BasicQueryRequest { this.taskId = taskId; } - public static InitSeriesReaderRequest createInitialQueryRequest(String groupId, String taskId, int readConsistencyLevel, - Map<PathType, QueryPlan> allQueryPlan, List<Filter> filterList){ + public static InitSeriesReaderRequest createInitialQueryRequest(String groupId, String taskId, + int readConsistencyLevel, + Map<PathType, QueryPlan> allQueryPlan, List<Filter> filterList) throws IOException { InitSeriesReaderRequest request = new InitSeriesReaderRequest(groupId, taskId); request.setReadConsistencyLevel(readConsistencyLevel); - request.allQueryPlan = allQueryPlan; - request.filterList = filterList; + for (Entry<PathType, QueryPlan> entry : allQueryPlan.entrySet()) { + request.allQueryPlan.put(entry.getKey(), toByteArray(entry.getValue())); + } + for (Filter filter : filterList) { + request.filterList.add(toByteArray(filter)); + } return request; } @@ -72,20 +83,51 @@ public class InitSeriesReaderRequest extends BasicQueryRequest { this.taskId = taskId; } - public Map<PathType, QueryPlan> getAllQueryPlan() { - return allQueryPlan; + public Map<PathType, QueryPlan> getAllQueryPlan() throws IOException, ClassNotFoundException { + Map<PathType, QueryPlan> queryPlanMap = new EnumMap<>(PathType.class); + for (Entry<PathType, byte[]> entry : allQueryPlan.entrySet()) { + queryPlanMap.put(entry.getKey(), (QueryPlan) toObject(entry.getValue())); + } + return queryPlanMap; } - public void setAllQueryPlan( - Map<PathType, QueryPlan> allQueryPlan) { - this.allQueryPlan = allQueryPlan; + public List<Filter> getFilterList() throws IOException, ClassNotFoundException { + List<Filter> filters = new ArrayList<>(); + for (byte[] filterBytes : filterList) { + filters.add((Filter) toObject(filterBytes)); + } + return filters; } - public List<Filter> getFilterList() { - return filterList; + /** + * Convert an object to byte array + * + * @param obj Object, which need to implement Serializable + * @return byte array of object + */ + private static byte[] toByteArray(Object obj) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(obj); + oos.flush(); + byte[] bytes = bos.toByteArray(); + oos.close(); + bos.close(); + return bytes; } - public void setFilterList(List<Filter> filterList) { - this.filterList = filterList; + /** + * Convert byte array back to Object + * + * @param bytes byte array of object + * @return object + */ + private static Object toObject(byte[] bytes) throws IOException, ClassNotFoundException { + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bis); + Object obj = ois.readObject(); + ois.close(); + bis.close(); + return obj; } } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/Constant.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/Constant.java new file mode 100644 index 0000000..71cf523 --- /dev/null +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/Constant.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.integration; + +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; + +public class Constant { + + public static final String d0s0 = "root.vehicle.d0.s0"; + public static final String d0s1 = "root.vehicle.d0.s1"; + public static final String d0s2 = "root.vehicle.d0.s2"; + public static final String d0s3 = "root.vehicle.d0.s3"; + public static final String d0s4 = "root.vehicle.d0.s4"; + public static final String d0s5 = "root.vehicle.d0.s5"; + public static final String d1s0 = "root.vehicle.d1.s0"; + public static final String d1s1 = "root.vehicle.d1.s1"; + public static final String TIMESTAMP_STR = "Time"; + public static boolean testFlag = true; + public static String[] stringValue = new String[]{"A", "B", "C", "D", "E"}; + public static String[] booleanValue = new String[]{"true", "false"}; + + public static String[] create_sql = new String[]{"SET STORAGE GROUP TO root.vehicle", + + "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE", + "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE", + "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE", + "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN", + "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN", + "CREATE TIMESERIES root.vehicle.d0.s5 WITH DATATYPE=DOUBLE, ENCODING=RLE", + "CREATE TIMESERIES root.vehicle.d1.s0 WITH DATATYPE=INT32, ENCODING=RLE", + "CREATE TIMESERIES root.vehicle.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE", + + }; + + public static String insertTemplate = "insert into %s(timestamp%s) values(%d%s)"; + + public static String first(String path) { + return String.format("first(%s)", path); + } + + public static String last(String path) { + return String.format("last(%s)", path); + } + + public static String sum(String path) { + return String.format("sum(%s)", path); + } + + public static String mean(String path) { + return String.format("mean(%s)", path); + } + + public static String count(String path) { + return String.format("count(%s)", path); + } + + public static String max_time(String path) { + return String.format("max_time(%s)", path); + } + + public static String min_time(String path) { + return String.format("min_time(%s)", path); + } + + public static String max_value(String path) { + return String.format("max_value(%s)", path); + } + + public static String min_value(String path) { + return String.format("min_value(%s)", path); + } + + public static String recordToInsert(TSRecord record) { + StringBuilder measurements = new StringBuilder(); + StringBuilder values = new StringBuilder(); + for (DataPoint dataPoint : record.dataPointList) { + measurements.append(",").append(dataPoint.getMeasurementId()); + values.append(",").append(dataPoint.getValue()); + } + return String + .format(insertTemplate, record.deviceId, measurements.toString(), record.time, values); + } +} diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationIT.java new file mode 100644 index 0000000..bf7c4da --- /dev/null +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationIT.java @@ -0,0 +1,640 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.integration; + +import static org.apache.iotdb.cluster.integration.Constant.count; +import static org.apache.iotdb.cluster.integration.Constant.first; +import static org.apache.iotdb.cluster.integration.Constant.last; +import static org.apache.iotdb.cluster.integration.Constant.max_time; +import static org.apache.iotdb.cluster.integration.Constant.max_value; +import static org.apache.iotdb.cluster.integration.Constant.mean; +import static org.apache.iotdb.cluster.integration.Constant.min_time; +import static org.apache.iotdb.cluster.integration.Constant.min_value; +import static org.apache.iotdb.cluster.integration.Constant.sum; +import static org.apache.iotdb.cluster.utils.Utils.insertData; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.iotdb.cluster.config.ClusterConfig; +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.entity.Server; +import org.apache.iotdb.cluster.utils.EnvironmentUtils; +import org.apache.iotdb.cluster.utils.QPExecutorUtils; +import org.apache.iotdb.cluster.utils.hash.PhysicalNode; +import org.apache.iotdb.jdbc.Config; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class IoTDBAggregationIT { + + private Server server; + private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig(); + private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(), + CLUSTER_CONFIG.getPort()); + + private static String[] creationSqls = new String[]{ + "SET STORAGE GROUP TO root.vehicle.d0", + "SET STORAGE GROUP TO root.vehicle.d1", + + "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE", + "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE", + "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE", + "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN" + }; + + private static String[] dataSet2 = new String[]{ + "SET STORAGE GROUP TO root.ln.wf01.wt01", + "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN", + "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=FLOAT, ENCODING=PLAIN", + "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, ENCODING=PLAIN", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(1, 1.1, false, 11)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(2, 2.2, true, 22)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(3, 3.3, false, 33 )", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(4, 4.4, false, 44)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(5, 5.5, false, 55)" + }; + + private String insertTemplate = "INSERT INTO root.vehicle.d0(timestamp,s0,s1,s2,s3)" + + " VALUES(%d,%d,%d,%f,%s)"; + + private static final String TIMESTAMP_STR = "Time"; + private final String d0s0 = "root.vehicle.d0.s0"; + private final String d0s1 = "root.vehicle.d0.s1"; + private final String d0s2 = "root.vehicle.d0.s2"; + private final String d0s3 = "root.vehicle.d0.s3"; + private static final String TEMPERATURE_STR = "root.ln.wf01.wt01.temperature"; + + @Before + public void setUp() throws Exception { + EnvironmentUtils.cleanEnv(); + EnvironmentUtils.closeStatMonitor(); + EnvironmentUtils.closeMemControl(); + CLUSTER_CONFIG.createAllPath(); + server = Server.getInstance(); + server.start(); + EnvironmentUtils.envSetUp(); + Class.forName(Config.JDBC_DRIVER_NAME); + prepareData(); + } + + @After + public void tearDown() throws Exception { + server.stop(); + QPExecutorUtils.setLocalNodeAddr(localNode.getIp(), localNode.getPort()); + EnvironmentUtils.cleanEnv(); + } + + @Test + public void test() throws SQLException { + String[] retArray = new String[]{ + "0,2", + "0,4", + "0,3" + }; + Connection connection = null; + try { + connection = DriverManager. + getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement(); + boolean hasResultSet = statement.execute( + "select count(temperature) from root.ln.wf01.wt01 where time > 3"); + + Assert.assertTrue(hasResultSet); + ResultSet resultSet = statement.getResultSet(); + int cnt = 0; + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + + resultSet.getString(count(TEMPERATURE_STR)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(1, cnt); + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute( + "select min_time(temperature) from root.ln.wf01.wt01 where time > 3"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + + resultSet.getString(min_time(TEMPERATURE_STR)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(2, cnt); + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute( + "select min_time(temperature) from root.ln.wf01.wt01 where temperature > 3"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + + resultSet.getString(min_time(TEMPERATURE_STR)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(3, cnt); + statement.close(); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test + public void remoteTest() throws SQLException { + QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); + test(); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + public void countTest() throws SQLException { + String[] retArray = new String[]{ + "0,2001,2001,2001,2001", + "0,7500,7500,7500,7500" + }; + Connection connection = null; + try { + connection = DriverManager. + getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement(); + boolean hasResultSet = statement.execute("select count(s0),count(s1),count(s2),count(s3) " + + "from root.vehicle.d0 where time >= 6000 and time <= 9000"); + + Assert.assertTrue(hasResultSet); + ResultSet resultSet = statement.getResultSet(); + int cnt = 0; + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0)) + + "," + resultSet.getString(count(d0s1)) + "," + resultSet.getString(count(d0s2)) + + "," + resultSet.getString(count(d0s3)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(1, cnt); + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute("select count(s0),count(s1),count(s2),count(s3) " + + "from root.vehicle.d0"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0)) + + "," + resultSet.getString(count(d0s1)) + "," + resultSet.getString(count(d0s2)) + + "," + resultSet.getString(count(d0s3)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(2, cnt); + statement.close(); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test + public void countRemoteTest() throws SQLException { + QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); + countTest(); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + + @Test + public void firstTest() throws SQLException { + String[] retArray = new String[]{ + "0,2000,2000,2000.0,2000", + "0,500,500,500.0,500" + }; + Connection connection = null; + try { + connection = DriverManager. + getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement(); + boolean hasResultSet = statement.execute("select first(s0),first(s1),first(s2),first(s3) " + + "from root.vehicle.d0 where time >= 1500 and time <= 9000"); + + Assert.assertTrue(hasResultSet); + ResultSet resultSet = statement.getResultSet(); + int cnt = 0; + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(first(d0s0)) + + "," + resultSet.getString(first(d0s1)) + "," + resultSet.getString(first(d0s2)) + + "," + resultSet.getString(first(d0s3)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(1, cnt); + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute("select first(s0),first(s1),first(s2),first(s3) " + + "from root.vehicle.d0"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(first(d0s0)) + + "," + resultSet.getString(first(d0s1)) + "," + resultSet.getString(first(d0s2)) + + "," + resultSet.getString(first(d0s3)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(2, cnt); + statement.close(); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test + public void firstRemoteTest() throws SQLException { + QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); + firstTest(); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + public void lastTest() throws SQLException { + String[] retArray = new String[]{ + "0,8499,8499.0", + "0,1499,1499.0", + "0,2200,2200.0" + }; + Connection connection = null; + try { + connection = DriverManager. + getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement(); + boolean hasResultSet = statement.execute("select last(s0),last(s2) " + + "from root.vehicle.d0 where time >= 1500 and time < 9000"); + + Assert.assertTrue(hasResultSet); + ResultSet resultSet = statement.getResultSet(); + int cnt = 0; + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(last(d0s0)) + + "," + resultSet.getString(last(d0s2)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(1, cnt); + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute("select last(s0),last(s2) " + + "from root.vehicle.d0 where time <= 1600"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(last(d0s0)) + + "," + resultSet.getString(last(d0s2)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(2, cnt); + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute("select last(s0),last(s2) " + + "from root.vehicle.d0 where time <= 2200"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(last(d0s0)) + + "," + resultSet.getString(last(d0s2)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(3, cnt); + statement.close(); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test + public void lastRemoteTest() throws SQLException { + QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); + lastTest(); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + public void maxminTimeTest() throws SQLException { + String[] retArray = new String[]{ + "0,8499,500", + "0,2499,2000" + }; + Connection connection = null; + try { + connection = DriverManager. + getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement(); + boolean hasResultSet = statement.execute("select max_time(s0),min_time(s2) " + + "from root.vehicle.d0 where time >= 100 and time < 9000"); + + Assert.assertTrue(hasResultSet); + ResultSet resultSet = statement.getResultSet(); + int cnt = 0; + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_time(d0s0)) + + "," + resultSet.getString(min_time(d0s2)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(1, cnt); + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute("select max_time(s0),min_time(s2) " + + "from root.vehicle.d0 where time <= 2500 and time > 1800"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_time(d0s0)) + + "," + resultSet.getString(min_time(d0s2)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(2, cnt); + statement.close(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test + public void maxminTimeRemoteTest() throws SQLException { + QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); + maxminTimeTest(); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + public void maxminValueTest() throws SQLException { + String[] retArray = new String[]{ + "0,8499,500.0", + "0,2499,500.0" + }; + Connection connection = null; + try { + connection = DriverManager. + getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement(); + boolean hasResultSet = statement.execute("select max_value(s0),min_value(s2) " + + "from root.vehicle.d0 where time >= 100 and time < 9000"); + + Assert.assertTrue(hasResultSet); + ResultSet resultSet = statement.getResultSet(); + int cnt = 0; + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_value(d0s0)) + + "," + resultSet.getString(min_value(d0s2)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(1, cnt); + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute("select max_value(s0),min_value(s2) " + + "from root.vehicle.d0 where time < 2500"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_value(d0s0)) + + "," + resultSet.getString(min_value(d0s2)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(2, cnt); + statement.close(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test + public void maxminValueRemoteTest() throws SQLException { + QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); + maxminTimeTest(); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + public void meanSumTest() throws SQLException { + String[] retArray = new String[]{ + "0,1.4508E7,7250.374812593703", + "0,626750.0,1250.998003992016" + }; + Connection connection = null; + try { + connection = DriverManager. + getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement(); + boolean hasResultSet = statement.execute("select sum(s0),mean(s2)" + + "from root.vehicle.d0 where time >= 6000 and time <= 9000"); + + Assert.assertTrue(hasResultSet); + ResultSet resultSet = statement.getResultSet(); + int cnt = 0; + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(sum(d0s0)) + + "," + resultSet.getString(mean(d0s2)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(1, cnt); + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute("select sum(s0),mean(s2)" + + "from root.vehicle.d0 where time >= 1000 and time <= 2000"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(sum(d0s0)) + + "," + resultSet.getString(mean(d0s2)); + Assert.assertEquals(retArray[cnt], ans); + cnt++; + } + Assert.assertEquals(2, cnt); + statement.close(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test + public void meanSumRemoteTest() throws SQLException { + QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); + meanSumTest(); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + private void prepareData() throws SQLException { + Connection connection = null; + try { + connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", + "root"); + Statement statement = connection.createStatement(); + insertData(connection, creationSqls, dataSet2); + // prepare BufferWrite file + for (int i = 5000; i < 7000; i++) { + statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'")); + } + statement.executeBatch(); + statement.clearBatch(); + statement.execute("flush"); + for (int i = 7500; i < 8500; i++) { + statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'")); + } + statement.executeBatch(); + statement.clearBatch(); + statement.execute("flush"); + + // prepare Unseq-File + for (int i = 500; i < 1500; i++) { + statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'")); + } + statement.executeBatch(); + statement.clearBatch(); + statement.execute("flush"); + for (int i = 3000; i < 6500; i++) { + statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'")); + } + statement.executeBatch(); + statement.clearBatch(); + statement.execute("merge"); + + // prepare BufferWrite cache + for (int i = 9000; i < 10000; i++) { + statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'")); + } + statement.executeBatch(); + statement.clearBatch(); + // prepare Overflow cache + for (int i = 2000; i < 2500; i++) { + statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'")); + } + statement.executeBatch(); + statement.clearBatch(); + statement.close(); + + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (connection != null) { + connection.close(); + } + } + } +} diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java index f5bf17f..ba8746d 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java @@ -40,7 +40,6 @@ import org.junit.Test; public class IoTDBFillQueryIT { - private Server server; private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig(); private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(), diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java index 6c458a5..401b056 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java @@ -50,18 +50,22 @@ import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; +/** + * Handle aggregation query and construct dataset + */ public class AggregateEngineExecutor { protected List<Path> selectedSeries; protected List<String> aggres; protected IExpression expression; - protected List<TSDataType> dataTypes; + private List<TSDataType> dataTypes; /** * aggregation batch calculation size. **/ - private int aggregateFetchSize; + protected int aggregateFetchSize; /** * constructor. @@ -317,9 +321,9 @@ public class AggregateEngineExecutor { /** * calculation aggregate result with value filter. */ - private List<AggreResultData> aggregateWithTimeGenerator( + protected List<AggreResultData> aggregateWithTimeGenerator( List<AggregateFunction> aggregateFunctions, - EngineTimeGenerator timestampGenerator, + TimeGenerator timestampGenerator, List<EngineReaderByTimeStamp> readersOfSelectedSeries) throws IOException { @@ -335,6 +339,8 @@ public class AggregateEngineExecutor { timeArray[timeArrayLength++] = timestampGenerator.next(); } + + // cal part of aggregate result for (int i = 0; i < readersOfSelectedSeries.size(); i++) { aggregateFunctions.get(i).calcAggregationUsingTimestamps(timeArray, timeArrayLength, diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java index c23c2b9..5d83314 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java @@ -58,10 +58,7 @@ public abstract class AbstractNodeConstructor { /** * Construct not series type node. -<<<<<<< HEAD -======= * ->>>>>>> master * @param expression expression * @return Node object * @throws FileNodeManagerException FileNodeManagerException
