This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 167a30e2f30d89dc91670a731fbb7fadac161a27 Author: lta <[email protected]> AuthorDate: Thu May 16 14:57:41 2019 +0800 add fill feature --- .../query/executor/ClusterFillEngineExecutor.java | 66 ++-- .../cluster/query/executor/ClusterQueryRouter.java | 13 +- .../query/manager/common/FillBatchData.java | 65 ++++ .../querynode/ClusterLocalSingleQueryManager.java | 86 +++-- .../AbstractClusterPointReader.java | 5 +- ...=> AbstractClusterSelectSeriesBatchReader.java} | 2 +- ...ava => ClusterFillSelectSeriesBatchReader.java} | 26 +- ...or.java => ClusterSelectSeriesBatchReader.java} | 13 +- ...ClusterSelectSeriesBatchReaderByTimestamp.java} | 7 +- .../query/utils/ClusterTimeValuePairUtils.java | 26 ++ .../query/utils/QueryPlanPartitionUtils.java | 41 ++- .../cluster/integration/IoTDBFillQueryIT.java | 366 +++++++++++++++++++++ .../IoTDBQueryIT.java} | 4 +- .../IoTDBQueryLargeDataIT.java} | 4 +- .../query/manager/ClusterLocalManagerTest.java | 66 ++-- .../db/query/executor/FillEngineExecutor.java | 11 +- .../db/query/executor/IFillEngineExecutor.java | 23 +- .../java/org/apache/iotdb/db/query/fill/IFill.java | 14 +- .../org/apache/iotdb/db/query/fill/LinearFill.java | 4 +- .../apache/iotdb/db/query/fill/PreviousFill.java | 6 +- 20 files changed, 701 insertions(+), 147 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java similarity index 57% copy from iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java copy to cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java index 83c5fa9..771637e 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java @@ -16,13 +16,14 @@ * specific language governing permissions and limitations * under the License. */ - -package org.apache.iotdb.db.query.executor; +package org.apache.iotdb.cluster.query.executor; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; +import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; @@ -30,6 +31,7 @@ import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator; +import org.apache.iotdb.db.query.executor.IFillEngineExecutor; import org.apache.iotdb.db.query.fill.IFill; import org.apache.iotdb.db.query.fill.PreviousFill; import org.apache.iotdb.db.query.reader.IPointReader; @@ -37,55 +39,57 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; -public class FillEngineExecutor { +public class ClusterFillEngineExecutor implements IFillEngineExecutor { - private long jobId; private List<Path> selectedSeries; private long queryTime; private Map<TSDataType, IFill> typeIFillMap; + private ClusterRpcSingleQueryManager queryManager; + - public FillEngineExecutor(long jobId, List<Path> selectedSeries, long queryTime, - Map<TSDataType, IFill> typeIFillMap) { - this.jobId = jobId; + public ClusterFillEngineExecutor(List<Path> selectedSeries, long queryTime, + Map<TSDataType, IFill> typeIFillMap, ClusterRpcSingleQueryManager queryManager) { this.selectedSeries = selectedSeries; this.queryTime = queryTime; this.typeIFillMap = typeIFillMap; + this.queryManager = queryManager; } - /** - * execute fill. - * - * @param context query context - */ + @Override public QueryDataSet execute(QueryContext context) throws FileNodeManagerException, PathErrorException, IOException { - QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedSeries); - + Map<Path, ClusterSelectSeriesReader> selectPathReaders = queryManager.getSelectSeriesReaders(); + List<Path> paths = new ArrayList<>(); List<IFill> fillList = new ArrayList<>(); List<TSDataType> dataTypeList = new ArrayList<>(); + List<IPointReader> readers = new ArrayList<>(); for (Path path : selectedSeries) { - QueryDataSource queryDataSource = QueryResourceManager.getInstance() - .getQueryDataSource(path, context); - TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath()); - dataTypeList.add(dataType); - IFill fill = null; - if (!typeIFillMap.containsKey(dataType)) { - fill = new PreviousFill(dataType, queryTime, 0); + if (selectPathReaders.containsKey(path)) { + ClusterSelectSeriesReader reader = selectPathReaders.get(path); + readers.add(reader); + dataTypeList.add(reader.getDataType()); } else { - fill = typeIFillMap.get(dataType).copy(path); + QueryDataSource queryDataSource = QueryResourceManager.getInstance() + .getQueryDataSource(path, context); + TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath()); + dataTypeList.add(dataType); + IFill fill; + if (!typeIFillMap.containsKey(dataType)) { + fill = new PreviousFill(dataType, queryTime, 0); + } else { + fill = typeIFillMap.get(dataType).copy(path); + } + fill.setDataType(dataType); + fill.setQueryTime(queryTime); + fill.constructReaders(queryDataSource, context); + fillList.add(fill); + readers.add(fill.getFillResult()); } - fill.setDataType(dataType); - fill.setQueryTime(queryTime); - fill.constructReaders(queryDataSource, context); - fillList.add(fill); } - List<IPointReader> readers = new ArrayList<>(); - for (IFill fill : fillList) { - readers.add(fill.getFillResult()); - } + QueryResourceManager.getInstance() + .beginQueryOfGivenQueryPaths(context.getJobId(), paths); return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypeList, readers); } - } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java index 4211528..2fa4576 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.executor.FillEngineExecutor; import org.apache.iotdb.db.query.executor.IEngineQueryRouter; import org.apache.iotdb.db.query.fill.IFill; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; @@ -108,7 +109,17 @@ public class ClusterQueryRouter implements IEngineQueryRouter { @Override public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType, QueryContext context) throws FileNodeManagerException, PathErrorException, IOException { - throw new UnsupportedOperationException(); + ClusterRpcSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance() + .getSingleQuery(context.getJobId()); + try { + queryManager.initQueryResource(QueryType.NO_FILTER, getReadDataConsistencyLevel()); + + ClusterFillEngineExecutor fillEngineExecutor = new ClusterFillEngineExecutor(fillPaths, queryTime, + fillType, queryManager); + return fillEngineExecutor.execute(context); + } catch (IOException | RaftConnectionException e) { + throw new FileNodeManagerException(e); + } } public int getReadDataConsistencyLevel() { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java new file mode 100644 index 0000000..3e128e3 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.query.manager.common; + +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.tsfile.read.common.BatchData; + +/** + * <code>FillBatchData</code> is a self-defined data structure which is used in cluster query + * process of fill type, which only contains one TimeValuePair and value can be null. + */ +public class FillBatchData extends BatchData { + + private TimeValuePair timeValuePair; + private boolean isUsed; + + public FillBatchData(TimeValuePair timeValuePair, boolean isUsed) { + this.timeValuePair = timeValuePair; + this.isUsed = isUsed; + } + + @Override + public boolean hasNext() { + return !isUsed; + } + + @Override + public void next() { + isUsed = true; + } + + @Override + public long currentTime() { + return timeValuePair.getTimestamp(); + } + + @Override + public Object currentValue() { + if (!isUsed) { + return timeValuePair.getValue() == null ? null : timeValuePair.getValue().getValue(); + } else { + return null; + } + } + + public TimeValuePair getTimeValuePair() { + return isUsed ? null : timeValuePair; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java index 37e3c57..f776477 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java @@ -28,9 +28,10 @@ import org.apache.iotdb.cluster.concurrent.pool.QueryTimerManager; import org.apache.iotdb.cluster.config.ClusterConstant; import org.apache.iotdb.cluster.query.PathType; import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory; -import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterBatchReader; -import org.apache.iotdb.cluster.query.reader.querynode.ClusterBatchReaderByTimestamp; -import org.apache.iotdb.cluster.query.reader.querynode.ClusterBatchReaderWithoutTimeGenerator; +import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader; +import org.apache.iotdb.cluster.query.reader.querynode.ClusterFillSelectSeriesBatchReader; +import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader; +import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp; import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader; import org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReader; import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest; @@ -39,6 +40,7 @@ import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataReques import org.apache.iotdb.cluster.rpc.raft.response.querydata.InitSeriesReaderResponse; import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataByTimestampResponse; import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; @@ -46,11 +48,14 @@ import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.qp.executor.OverflowQPExecutor; import org.apache.iotdb.db.qp.executor.QueryProcessExecutor; import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; +import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.executor.AbstractExecutorWithoutTimeGenerator; +import org.apache.iotdb.db.query.fill.IFill; +import org.apache.iotdb.db.query.fill.PreviousFill; import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; @@ -89,7 +94,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM /** * Key is series full path, value is reader of select series */ - private Map<String, AbstractClusterBatchReader> selectSeriesReaders = new HashMap<>(); + private Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = new HashMap<>(); /** * Filter reader @@ -130,6 +135,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM throw new UnsupportedOperationException(); } else if (plan instanceof AggregationPlan) { throw new UnsupportedOperationException(); + } else if (plan instanceof FillQueryPlan) { + handleFillSeriesRerader(plan, context, response); } else { if (plan.getExpression() == null || plan.getExpression().getType() == ExpressionType.GLOBAL_TIME) { @@ -147,22 +154,40 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM } /** - * Handle filter series reader + * Handle fill series reader * - * @param plan filter series query plan + * @param queryPlan fill query plan */ - private void handleFilterSeriesReader(QueryPlan plan, QueryContext context, - InitSeriesReaderRequest request, InitSeriesReaderResponse response, PathType pathType) - throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException, ProcessorException, IOException { - QueryDataSet queryDataSet = queryProcessExecutor - .processQuery(plan, context); - List<Path> paths = plan.getPaths(); - List<TSDataType> dataTypes = queryDataSet.getDataTypes(); - for (int i = 0; i < paths.size(); i++) { - dataTypeMap.put(paths.get(i).getFullPath(), dataTypes.get(i)); + private void handleFillSeriesRerader(QueryPlan queryPlan, QueryContext context, + InitSeriesReaderResponse response) + throws FileNodeManagerException, PathErrorException, IOException { + FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan; + + List<Path> selectedPaths = queryPlan.getPaths(); + List<TSDataType> dataTypes = new ArrayList<>(); + QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedPaths); + + Map<TSDataType, IFill> typeIFillMap = fillQueryPlan.getFillType(); + for (Path path : selectedPaths) { + QueryDataSource queryDataSource = QueryResourceManager.getInstance() + .getQueryDataSource(path, context); + TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath()); + dataTypes.add(dataType); + IFill fill; + if (!typeIFillMap.containsKey(dataType)) { + fill = new PreviousFill(dataType, fillQueryPlan.getQueryTime(), 0); + } else { + fill = typeIFillMap.get(dataType).copy(path); + } + fill.setDataType(dataType); + fill.setQueryTime(fillQueryPlan.getQueryTime()); + fill.constructReaders(queryDataSource, context); + selectSeriesReaders.put(path.getFullPath(), + new ClusterFillSelectSeriesBatchReader(dataType, fill.getFillResult())); + dataTypeMap.put(path.getFullPath(), dataType); } - response.getSeriesDataTypes().put(pathType, dataTypes); - filterReader = new ClusterFilterSeriesBatchReader(queryDataSet, paths, request.getFilterList()); + + response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes); } /** @@ -188,13 +213,32 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM IPointReader reader = AbstractExecutorWithoutTimeGenerator .createSeriesReader(context, paths.get(i), dataTypes, timeFilter); selectSeriesReaders - .put(fullPath, new ClusterBatchReaderWithoutTimeGenerator(dataTypes.get(i), reader)); + .put(fullPath, new ClusterSelectSeriesBatchReader(dataTypes.get(i), reader)); dataTypeMap.put(fullPath, dataTypes.get(i)); } response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes); } /** + * Handle filter series reader + * + * @param plan filter series query plan + */ + private void handleFilterSeriesReader(QueryPlan plan, QueryContext context, + InitSeriesReaderRequest request, InitSeriesReaderResponse response, PathType pathType) + throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException, ProcessorException, IOException { + QueryDataSet queryDataSet = queryProcessExecutor + .processQuery(plan, context); + List<Path> paths = plan.getPaths(); + List<TSDataType> dataTypes = queryDataSet.getDataTypes(); + for (int i = 0; i < paths.size(); i++) { + dataTypeMap.put(paths.get(i).getFullPath(), dataTypes.get(i)); + } + response.getSeriesDataTypes().put(pathType, dataTypes); + filterReader = new ClusterFilterSeriesBatchReader(queryDataSet, paths, request.getFilterList()); + } + + /** * Handle select series query with value filter * * @param plan plan query plan @@ -212,7 +256,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM .createReaderByTimeStamp(path, context); TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath()); selectSeriesReaders - .put(path.getFullPath(), new ClusterBatchReaderByTimestamp(readerByTimeStamp, dataType)); + .put(path.getFullPath(), new ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType)); dataTypeMap.put(path.getFullPath(), dataType); dataTypeList.add(dataType); } @@ -253,7 +297,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM this.queryRound = targetQueryRounds; List<BatchData> batchDataList = new ArrayList<>(); for (String series : fetchDataSeries) { - AbstractClusterBatchReader reader = selectSeriesReaders.get(series); + AbstractClusterSelectSeriesBatchReader reader = selectSeriesReaders.get(series); batchDataList.add(reader.nextBatch(request.getBatchTimestamp())); } cachedBatchDataResult = batchDataList; @@ -309,7 +353,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM return queryRound; } - public Map<String, AbstractClusterBatchReader> getSelectSeriesReaders() { + public Map<String, AbstractClusterSelectSeriesBatchReader> getSelectSeriesReaders() { return selectSeriesReaders; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java index 72c7c70..3f73160 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java @@ -20,9 +20,9 @@ package org.apache.iotdb.cluster.query.reader.coordinatornode; import java.io.IOException; import org.apache.iotdb.cluster.exception.RaftConnectionException; +import org.apache.iotdb.cluster.query.utils.ClusterTimeValuePairUtils; import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.db.utils.TimeValuePair; -import org.apache.iotdb.db.utils.TimeValuePairUtils; import org.apache.iotdb.tsfile.read.common.BatchData; /** @@ -63,7 +63,8 @@ public abstract class AbstractClusterPointReader implements IPointReader { @Override public TimeValuePair next() throws IOException { if (hasNext()) { - TimeValuePair timeValuePair = TimeValuePairUtils.getCurrentTimeValuePair(currentBatchData); + TimeValuePair timeValuePair = ClusterTimeValuePairUtils + .getCurrentTimeValuePair(currentBatchData); currentTimeValuePair = timeValuePair; currentBatchData.next(); return timeValuePair; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterSelectSeriesBatchReader.java similarity index 93% copy from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java copy to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterSelectSeriesBatchReader.java index b0a86bd..6fe28e2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterSelectSeriesBatchReader.java @@ -26,7 +26,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData; /** * Cluster batch reader, which provides another method to get batch data by batch timestamp. */ -public abstract class AbstractClusterBatchReader implements IBatchReader { +public abstract class AbstractClusterSelectSeriesBatchReader implements IBatchReader { /** * Get batch data by batch time diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java similarity index 62% copy from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java copy to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java index b0a86bd..55639a1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java @@ -19,21 +19,21 @@ package org.apache.iotdb.cluster.query.reader.querynode; import java.io.IOException; -import java.util.List; -import org.apache.iotdb.db.query.reader.IBatchReader; +import org.apache.iotdb.cluster.query.manager.common.FillBatchData; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.BatchData; -/** - * Cluster batch reader, which provides another method to get batch data by batch timestamp. - */ -public abstract class AbstractClusterBatchReader implements IBatchReader { +public class ClusterFillSelectSeriesBatchReader extends ClusterSelectSeriesBatchReader { - /** - * Get batch data by batch time - * - * @param batchTime valid batch timestamp - * @return corresponding batch data - */ - public abstract BatchData nextBatch(List<Long> batchTime) throws IOException; + public ClusterFillSelectSeriesBatchReader( + TSDataType dataType, + IPointReader reader) { + super(dataType, reader); + } + @Override + public BatchData nextBatch() throws IOException { + return hasNext() ? new FillBatchData(reader.next(), false) : new FillBatchData(null, true); + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterBatchReaderWithoutTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java similarity index 85% rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterBatchReaderWithoutTimeGenerator.java rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java index a59c79c..cbbad2e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterBatchReaderWithoutTimeGenerator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java @@ -30,21 +30,22 @@ import org.apache.iotdb.tsfile.read.common.BatchData; /** * BatchReader without time generator for cluster which is used in query node. */ -public class ClusterBatchReaderWithoutTimeGenerator extends AbstractClusterBatchReader { +public class ClusterSelectSeriesBatchReader extends + AbstractClusterSelectSeriesBatchReader { /** * Data type */ - private TSDataType dataType; + protected TSDataType dataType; /** * Point reader */ - private IPointReader reader; + protected IPointReader reader; - private static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig(); + static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig(); - public ClusterBatchReaderWithoutTimeGenerator( + public ClusterSelectSeriesBatchReader( TSDataType dataType, IPointReader reader) { this.dataType = dataType; this.reader = reader; @@ -80,7 +81,7 @@ public class ClusterBatchReaderWithoutTimeGenerator extends AbstractClusterBatch @Override public BatchData nextBatch(List<Long> batchTime) throws IOException { throw new IOException( - "nextBatch(List<Long> batchTime) in ClusterBatchReaderWithoutTimeGenerator is an empty method."); + "nextBatch(List<Long> batchTime) in ClusterSelectSeriesBatchReader is an empty method."); } public TSDataType getDataType() { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterBatchReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderByTimestamp.java similarity index 90% rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterBatchReaderByTimestamp.java rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderByTimestamp.java index b8c36eb..72dce05 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterBatchReaderByTimestamp.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderByTimestamp.java @@ -27,7 +27,8 @@ import org.apache.iotdb.tsfile.read.common.BatchData; /** * BatchReader by timestamp for cluster which is used in query node. */ -public class ClusterBatchReaderByTimestamp extends AbstractClusterBatchReader { +public class ClusterSelectSeriesBatchReaderByTimestamp extends + AbstractClusterSelectSeriesBatchReader { /** * Reader @@ -39,7 +40,7 @@ public class ClusterBatchReaderByTimestamp extends AbstractClusterBatchReader { */ private TSDataType dataType; - public ClusterBatchReaderByTimestamp( + public ClusterSelectSeriesBatchReaderByTimestamp( EngineReaderByTimeStamp readerByTimeStamp, TSDataType dataType) { this.readerByTimeStamp = readerByTimeStamp; @@ -54,7 +55,7 @@ public class ClusterBatchReaderByTimestamp extends AbstractClusterBatchReader { @Override public BatchData nextBatch() throws IOException { throw new UnsupportedOperationException( - "nextBatch() in ClusterBatchReaderByTimestamp is an empty method."); + "nextBatch() in ClusterSelectSeriesBatchReaderByTimestamp is an empty method."); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java new file mode 100644 index 0000000..a0ee256 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java @@ -0,0 +1,26 @@ +package org.apache.iotdb.cluster.query.utils; + +import org.apache.iotdb.cluster.query.manager.common.FillBatchData; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.db.utils.TimeValuePairUtils; +import org.apache.iotdb.tsfile.read.common.BatchData; + +public class ClusterTimeValuePairUtils { + + private ClusterTimeValuePairUtils() { + } + + /** + * get given data's current (time,value) pair. + * + * @param data -batch data + * @return -given data's (time,value) pair + */ + public static TimeValuePair getCurrentTimeValuePair(BatchData data) { + if (data instanceof FillBatchData){ + return ((FillBatchData)data).getTimeValuePair(); + }else{ + return TimeValuePairUtils.getCurrentTimeValuePair(data); + } + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java index be762e1..546282a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java @@ -19,6 +19,8 @@ package org.apache.iotdb.cluster.query.utils; import java.util.ArrayList; +import java.util.EnumMap; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -27,8 +29,11 @@ import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity; import org.apache.iotdb.cluster.utils.QPExecutorUtils; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; +import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; +import org.apache.iotdb.db.query.fill.IFill; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.expression.ExpressionType; import org.apache.iotdb.tsfile.read.expression.IExpression; @@ -43,12 +48,17 @@ public class QueryPlanPartitionUtils { } /** - * Split query plan with no filter or with only global time filter by group id + * Split query plan with no filter, with only global time filter by group id or fill query */ public static void splitQueryPlanWithoutValueFilter( ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException { - splitQueryPlanBySelectPath(singleQueryManager); + QueryPlan queryPLan = singleQueryManager.getOriginQueryPlan(); + if (queryPLan instanceof FillQueryPlan) { + splitFillPlan((FillQueryPlan)queryPLan, singleQueryManager); + } else { + splitQueryPlanBySelectPath(singleQueryManager); + } } /** @@ -93,7 +103,7 @@ public class QueryPlanPartitionUtils { } } - private static void splitGroupByPlan(GroupByPlan queryPlan, + private static void splitGroupByPlan(GroupByPlan groupByPlan, ClusterRpcSingleQueryManager singleQueryManager) { throw new UnsupportedOperationException(); } @@ -103,6 +113,31 @@ public class QueryPlanPartitionUtils { throw new UnsupportedOperationException(); } + private static void splitFillPlan(FillQueryPlan fillQueryPlan, + ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException { + List<Path> selectPaths = fillQueryPlan.getPaths(); + Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId(); + Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans(); + for (Path path : selectPaths) { + String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice()); + if (!selectSeriesByGroupId.containsKey(groupId)) { + selectSeriesByGroupId.put(groupId, new ArrayList<>()); + } + selectSeriesByGroupId.get(groupId).add(path); + } + for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) { + String groupId = entry.getKey(); + List<Path> paths = entry.getValue(); + FillQueryPlan subQueryPlan = new FillQueryPlan(); + subQueryPlan.setProposer(fillQueryPlan.getProposer()); + subQueryPlan.setPaths(paths); + subQueryPlan.setExpression(fillQueryPlan.getExpression()); + subQueryPlan.setQueryTime(fillQueryPlan.getQueryTime()); + subQueryPlan.setFillType(new EnumMap<>(fillQueryPlan.getFillType())); + selectPathPlans.put(groupId, subQueryPlan); + } + } + private static void splitQueryPlan(QueryPlan queryPlan, ClusterRpcSingleQueryManager singleQueryManager) throws PathErrorException { splitQueryPlanBySelectPath(singleQueryManager); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java new file mode 100644 index 0000000..f5bf17f --- /dev/null +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java @@ -0,0 +1,366 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.integration; + +import static org.apache.iotdb.cluster.utils.Utils.insertData; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.iotdb.cluster.config.ClusterConfig; +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.entity.Server; +import org.apache.iotdb.cluster.utils.EnvironmentUtils; +import org.apache.iotdb.cluster.utils.QPExecutorUtils; +import org.apache.iotdb.cluster.utils.hash.PhysicalNode; +import org.apache.iotdb.jdbc.Config; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class IoTDBFillQueryIT { + + + private Server server; + private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig(); + private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(), + CLUSTER_CONFIG.getPort()); + + private static String[] createSQLs = new String[]{ + "SET STORAGE GROUP TO root.ln.wf01.wt01", + "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN", + "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=DOUBLE, ENCODING=PLAIN", + "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, ENCODING=PLAIN"}; + private static String[] insertSQLs = new String[]{ + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(1, 1.1, false, 11)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(2, 2.2, true, 22)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(3, 3.3, false, 33 )", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(4, 4.4, false, 44)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(5, 5.5, false, 55)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(100, 100.1, false, 110)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(150, 200.2, true, 220)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(200, 300.3, false, 330 )", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(250, 400.4, false, 440)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(300, 500.5, false, 550)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(10, 10.1, false, 110)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(20, 20.2, true, 220)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(30, 30.3, false, 330 )", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(40, 40.4, false, 440)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(50, 50.5, false, 550)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(500, 100.1, false, 110)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(510, 200.2, true, 220)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(520, 300.3, false, 330 )", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(530, 400.4, false, 440)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(540, 500.5, false, 550)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(580, 100.1, false, 110)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(590, 200.2, true, 220)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(600, 300.3, false, 330 )", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(610, 400.4, false, 440)", + "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) " + + "values(620, 500.5, false, 550)", + }; + + private static final String TIMESTAMP_STR = "Time"; + private static final String TEMPERATURE_STR = "root.ln.wf01.wt01.temperature"; + private static final String STATUS_STR = "root.ln.wf01.wt01.status"; + private static final String HARDWARE_STR = "root.ln.wf01.wt01.hardware"; + + @Before + public void setUp() throws Exception { + EnvironmentUtils.cleanEnv(); + EnvironmentUtils.closeStatMonitor(); + EnvironmentUtils.closeMemControl(); + CLUSTER_CONFIG.createAllPath(); + server = Server.getInstance(); + server.start(); + EnvironmentUtils.envSetUp(); + Class.forName(Config.JDBC_DRIVER_NAME); + prepareData(); + } + + @After + public void tearDown() throws Exception { + server.stop(); + QPExecutorUtils.setLocalNodeAddr(localNode.getIp(), localNode.getPort()); + EnvironmentUtils.cleanEnv(); + } + + + @Test + public void LinearFillTest() throws SQLException { + String[] retArray1 = new String[]{ + "3,3.3,false,33", + "70,70.34,false,374", + "70,null,null,null", + "625,null,false,null" + }; + Connection connection = null; + try { + connection = DriverManager. + getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement(); + boolean hasResultSet = statement.execute("select temperature,status, hardware from " + + "root.ln.wf01.wt01 where time = 3 " + + "Fill(int32[linear, 5ms, 5ms], double[linear, 5ms, 5ms], boolean[previous, 5ms])"); + + Assert.assertTrue(hasResultSet); + ResultSet resultSet = statement.getResultSet(); + int cnt = 0; + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR) + + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR); + System.out.println(ans); + Assert.assertEquals(retArray1[cnt], ans); + cnt++; + } + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute("select temperature,status, hardware " + + "from root.ln.wf01.wt01 where time = 70 Fill(int32[linear, 500ms, 500ms], " + + "double[linear, 500ms, 500ms], boolean[previous, 500ms])"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR) + + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR); + Assert.assertEquals(retArray1[cnt], ans); + cnt++; + System.out.println(ans); + } + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute("select temperature,status, hardware " + + "from root.ln.wf01.wt01 where time = 70 " + + "Fill(int32[linear, 25ms, 25ms], double[linear, 25ms, 25ms], boolean[previous, 5ms])"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR) + + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR); + Assert.assertEquals(retArray1[cnt], ans); + cnt++; + System.out.println(ans); + } + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute("select temperature,status, hardware " + + "from root.ln.wf01.wt01 where time = 625 " + + "Fill(int32[linear, 25ms, 25ms], double[linear, 25ms, 25ms], boolean[previous, 5ms])"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR) + + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR); + System.out.println(cnt + " " + ans); + Assert.assertEquals(retArray1[cnt], ans); + cnt++; + } + statement.close(); + Assert.assertEquals(retArray1.length, cnt); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test + public void LinearFillRemoteTest() throws SQLException { + QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); + LinearFillTest(); + } + + @Test + public void PreviousFillTest() throws SQLException { + String[] retArray1 = new String[]{ + "3,3.3,false,33", + "70,50.5,false,550", + "70,null,null,null" + }; + Connection connection = null; + try { + connection = DriverManager. + getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement(); + boolean hasResultSet = statement.execute("select temperature,status, hardware " + + "from root.ln.wf01.wt01 where time = 3 " + + "Fill(int32[previous, 5ms], double[previous, 5ms], boolean[previous, 5ms])"); + + Assert.assertTrue(hasResultSet); + ResultSet resultSet = statement.getResultSet(); + int cnt = 0; + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR) + + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR); + Assert.assertEquals(retArray1[cnt], ans); + cnt++; + } + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute("select temperature,status, hardware " + + "from root.ln.wf01.wt01 where time = 70 " + + "Fill(int32[previous, 500ms], double[previous, 500ms], boolean[previous, 500ms])"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR) + + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR); + Assert.assertEquals(retArray1[cnt], ans); + cnt++; + System.out.println(ans); + } + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute("select temperature,status, hardware " + + "from root.ln.wf01.wt01 where time = 70 " + + "Fill(int32[previous, 15ms], double[previous, 15ms], boolean[previous, 5ms])"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR) + + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR); + Assert.assertEquals(retArray1[cnt], ans); + cnt++; + System.out.println(ans); + } + statement.close(); + Assert.assertEquals(retArray1.length, cnt); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test + public void PreviousFillRemoteTest() throws SQLException { + QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); + PreviousFillTest(); + } + + @Test + public void EmptyTimeRangeFillTest() throws SQLException { + String[] retArray1 = new String[]{ + "3,3.3,false,33", + "70,70.34,false,374" + }; + Connection connection = null; + try { + connection = DriverManager. + getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement(); + boolean hasResultSet = statement.execute("select temperature,status, hardware " + + "from root.ln.wf01.wt01 where time = 3 " + + "Fill(int32[linear], double[linear], boolean[previous])"); + + Assert.assertTrue(hasResultSet); + ResultSet resultSet = statement.getResultSet(); + int cnt = 0; + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR) + + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR); + Assert.assertEquals(retArray1[cnt], ans); + cnt++; + } + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute("select temperature,status, hardware " + + "from root.ln.wf01.wt01 where time = 70 " + + "Fill(int32[linear], double[linear], boolean[previous])"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(TEMPERATURE_STR) + + "," + resultSet.getString(STATUS_STR) + "," + resultSet.getString(HARDWARE_STR); + Assert.assertEquals(retArray1[cnt], ans); + cnt++; + System.out.println(ans); + } + statement.close(); + Assert.assertEquals(retArray1.length, cnt); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test + public void EmptyTimeRangeFillRemoteTest() throws SQLException { + QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); + EmptyTimeRangeFillTest(); + } + + private void prepareData() throws SQLException { + try (Connection connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", + "root")) { + insertData(connection, createSQLs, insertSQLs); + } + } +} diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java similarity index 99% rename from cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryTest.java rename to cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java index f5cc295..90d4474 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.cluster.query; +package org.apache.iotdb.cluster.integration; import static org.apache.iotdb.cluster.utils.Utils.insertBatchData; import static org.apache.iotdb.cluster.utils.Utils.insertData; @@ -43,7 +43,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -public class ClusterQueryTest { +public class IoTDBQueryIT { private Server server; private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig(); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryLargeDataIT.java similarity index 99% rename from cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java rename to cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryLargeDataIT.java index 223f0dc..926d8a7 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryLargeDataIT.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.cluster.query; +package org.apache.iotdb.cluster.integration; import static org.apache.iotdb.cluster.utils.Utils.insertData; import static org.junit.Assert.assertEquals; @@ -41,7 +41,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -public class ClusterQueryLargeDataTest { +public class IoTDBQueryLargeDataIT { private Server server; diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java index c09aaa5..b822831 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java @@ -39,10 +39,10 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.entity.Server; import org.apache.iotdb.cluster.query.manager.querynode.ClusterLocalQueryManager; import org.apache.iotdb.cluster.query.manager.querynode.ClusterLocalSingleQueryManager; -import org.apache.iotdb.cluster.query.reader.querynode.ClusterBatchReaderByTimestamp; -import org.apache.iotdb.cluster.query.reader.querynode.ClusterBatchReaderWithoutTimeGenerator; +import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader; +import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader; +import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp; import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader; -import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterBatchReader; import org.apache.iotdb.cluster.utils.EnvironmentUtils; import org.apache.iotdb.cluster.utils.QPExecutorUtils; import org.apache.iotdb.cluster.utils.hash.PhysicalNode; @@ -220,17 +220,17 @@ public class ClusterLocalManagerTest { assertEquals((long) map.get(taskId), singleQueryManager.getJobId()); assertEquals(0, singleQueryManager.getQueryRound()); assertNull(singleQueryManager.getFilterReader()); - Map<String, AbstractClusterBatchReader> selectSeriesReaders = singleQueryManager + Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager .getSelectSeriesReaders(); assertEquals(3, selectSeriesReaders.size()); Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap(); - for (Entry<String, AbstractClusterBatchReader> entry : selectSeriesReaders.entrySet()) { + for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) { String path = entry.getKey(); TSDataType dataType = typeMap.get(path); - AbstractClusterBatchReader clusterBatchReader = entry.getValue(); - assertNotNull(((ClusterBatchReaderWithoutTimeGenerator) clusterBatchReader).getReader()); + AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue(); + assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader()); assertEquals(dataType, - ((ClusterBatchReaderWithoutTimeGenerator) clusterBatchReader).getDataType()); + ((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType()); } } @@ -247,17 +247,17 @@ public class ClusterLocalManagerTest { assertEquals((long) map.get(taskId), singleQueryManager.getJobId()); assertEquals(0, singleQueryManager.getQueryRound()); assertNull(singleQueryManager.getFilterReader()); - Map<String, AbstractClusterBatchReader> selectSeriesReaders = singleQueryManager + Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager .getSelectSeriesReaders(); assertEquals(3, selectSeriesReaders.size()); Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap(); - for (Entry<String, AbstractClusterBatchReader> entry : selectSeriesReaders.entrySet()) { + for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) { String path = entry.getKey(); TSDataType dataType = typeMap.get(path); - AbstractClusterBatchReader clusterBatchReader = entry.getValue(); - assertNotNull(((ClusterBatchReaderWithoutTimeGenerator) clusterBatchReader).getReader()); + AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue(); + assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader()); assertEquals(dataType, - ((ClusterBatchReaderWithoutTimeGenerator) clusterBatchReader).getDataType()); + ((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType()); } } @@ -274,17 +274,17 @@ public class ClusterLocalManagerTest { assertEquals((long) map.get(taskId), singleQueryManager.getJobId()); assertEquals(0, singleQueryManager.getQueryRound()); assertNull(singleQueryManager.getFilterReader()); - Map<String, AbstractClusterBatchReader> selectSeriesReaders = singleQueryManager + Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager .getSelectSeriesReaders(); assertEquals(3, selectSeriesReaders.size()); Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap(); - for (Entry<String, AbstractClusterBatchReader> entry : selectSeriesReaders.entrySet()) { + for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) { String path = entry.getKey(); TSDataType dataType = typeMap.get(path); - AbstractClusterBatchReader clusterBatchReader = entry.getValue(); - assertNotNull(((ClusterBatchReaderWithoutTimeGenerator) clusterBatchReader).getReader()); + AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue(); + assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader()); assertEquals(dataType, - ((ClusterBatchReaderWithoutTimeGenerator) clusterBatchReader).getDataType()); + ((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType()); } } statement.close(); @@ -317,18 +317,18 @@ public class ClusterLocalManagerTest { assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath())); assertNotNull(filterReader.getQueryDataSet()); - Map<String, AbstractClusterBatchReader> selectSeriesReaders = singleQueryManager + Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager .getSelectSeriesReaders(); assertNotNull(selectSeriesReaders); assertEquals(3, selectSeriesReaders.size()); Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap(); - for (Entry<String, AbstractClusterBatchReader> entry : selectSeriesReaders.entrySet()) { + for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) { String path = entry.getKey(); TSDataType dataType = typeMap.get(path); - AbstractClusterBatchReader clusterBatchReader = entry.getValue(); - assertNotNull(((ClusterBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp()); + AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue(); + assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp()); assertEquals(dataType, - ((ClusterBatchReaderByTimestamp) clusterBatchReader).getDataType()); + ((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType()); } } @@ -351,18 +351,18 @@ public class ClusterLocalManagerTest { assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath())); assertNotNull(filterReader.getQueryDataSet()); - Map<String, AbstractClusterBatchReader> selectSeriesReaders = singleQueryManager + Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager .getSelectSeriesReaders(); assertNotNull(selectSeriesReaders); assertEquals(3, selectSeriesReaders.size()); Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap(); - for (Entry<String, AbstractClusterBatchReader> entry : selectSeriesReaders.entrySet()) { + for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) { String path = entry.getKey(); TSDataType dataType = typeMap.get(path); - AbstractClusterBatchReader clusterBatchReader = entry.getValue(); - assertNotNull(((ClusterBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp()); + AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue(); + assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp()); assertEquals(dataType, - ((ClusterBatchReaderByTimestamp) clusterBatchReader).getDataType()); + ((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType()); } } @@ -385,18 +385,18 @@ public class ClusterLocalManagerTest { assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath())); assertNotNull(filterReader.getQueryDataSet()); - Map<String, AbstractClusterBatchReader> selectSeriesReaders = singleQueryManager + Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager .getSelectSeriesReaders(); assertNotNull(selectSeriesReaders); assertEquals(3, selectSeriesReaders.size()); Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap(); - for (Entry<String, AbstractClusterBatchReader> entry : selectSeriesReaders.entrySet()) { + for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) { String path = entry.getKey(); TSDataType dataType = typeMap.get(path); - AbstractClusterBatchReader clusterBatchReader = entry.getValue(); - assertNotNull(((ClusterBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp()); + AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue(); + assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp()); assertEquals(dataType, - ((ClusterBatchReaderByTimestamp) clusterBatchReader).getDataType()); + ((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType()); } } statement.close(); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java index 83c5fa9..904bc2d 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.iotdb.db.query.executor; import java.io.IOException; @@ -37,7 +36,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; -public class FillEngineExecutor { +public class FillEngineExecutor implements IFillEngineExecutor{ private long jobId; private List<Path> selectedSeries; @@ -52,11 +51,7 @@ public class FillEngineExecutor { this.typeIFillMap = typeIFillMap; } - /** - * execute fill. - * - * @param context query context - */ + @Override public QueryDataSet execute(QueryContext context) throws FileNodeManagerException, PathErrorException, IOException { QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedSeries); @@ -68,7 +63,7 @@ public class FillEngineExecutor { .getQueryDataSource(path, context); TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath()); dataTypeList.add(dataType); - IFill fill = null; + IFill fill; if (!typeIFillMap.containsKey(dataType)) { fill = new PreviousFill(dataType, queryTime, 0); } else { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IFillEngineExecutor.java similarity index 60% rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java rename to iotdb/src/main/java/org/apache/iotdb/db/query/executor/IFillEngineExecutor.java index b0a86bd..9e207ae 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IFillEngineExecutor.java @@ -16,24 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.cluster.query.reader.querynode; +package org.apache.iotdb.db.query.executor; import java.io.IOException; -import java.util.List; -import org.apache.iotdb.db.query.reader.IBatchReader; -import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; -/** - * Cluster batch reader, which provides another method to get batch data by batch timestamp. - */ -public abstract class AbstractClusterBatchReader implements IBatchReader { +public interface IFillEngineExecutor { /** - * Get batch data by batch time + * execute fill. * - * @param batchTime valid batch timestamp - * @return corresponding batch data + * @param context query context */ - public abstract BatchData nextBatch(List<Long> batchTime) throws IOException; - + QueryDataSet execute(QueryContext context) + throws FileNodeManagerException, PathErrorException, IOException; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java index d64b49a..9f9a050 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java @@ -16,10 +16,10 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.iotdb.db.query.fill; import java.io.IOException; +import java.io.Serializable; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; @@ -33,12 +33,13 @@ import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; -public abstract class IFill { +public abstract class IFill implements Serializable { + private static final long serialVersionUID = -357739398193527464L; long queryTime; TSDataType dataType; - IPointReader allDataReader; + transient IPointReader allDataReader; public IFill(TSDataType dataType, long queryTime) { this.dataType = dataType; @@ -106,8 +107,11 @@ public abstract class IFill { @Override public TimeValuePair next() { - isUsed = true; - return pair; + if (!isUsed) { + isUsed = true; + return pair; + } + return null; } @Override diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java index dc46082..399e2b8 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.fill; import java.io.IOException; +import java.io.Serializable; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.UnSupportedFillTypeException; import org.apache.iotdb.db.query.context.QueryContext; @@ -29,8 +30,9 @@ import org.apache.iotdb.db.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; -public class LinearFill extends IFill { +public class LinearFill extends IFill implements Serializable { + private static final long serialVersionUID = -1774599523110930574L; private long beforeRange; private long afterRange; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java index b75fb4f..93c50a8 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.query.fill; import java.io.IOException; +import java.io.Serializable; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.IPointReader; @@ -26,8 +27,9 @@ import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; -public class PreviousFill extends IFill { +public class PreviousFill extends IFill implements Serializable { + private static final long serialVersionUID = -7946089166912781464L; private long beforeRange; public PreviousFill(TSDataType dataType, long queryTime, long beforeRange) { @@ -57,7 +59,7 @@ public class PreviousFill extends IFill { @Override public IPointReader getFillResult() throws IOException { TimeValuePair beforePair = null; - TimeValuePair cachedPair = null; + TimeValuePair cachedPair; while (allDataReader.hasNext()) { cachedPair = allDataReader.next(); if (cachedPair.getTimestamp() <= queryTime) {
