This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch new_vector in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c95efc9d1381c1419a6768bad9cf7c12831dd3f6 Author: JackieTien97 <[email protected]> AuthorDate: Fri Oct 29 16:06:17 2021 +0800 add VectorSeriesReader --- .../cluster/query/reader/ClusterReaderFactory.java | 11 +--- .../org/apache/iotdb/db/metadata/PartialPath.java | 71 +++++++++++++++++----- .../iotdb/db/metadata/VectorPartialPath.java | 52 ++++++++++++++-- .../query/reader/series/SeriesAggregateReader.java | 13 +--- .../reader/series/SeriesRawDataBatchReader.java | 32 +++------- .../iotdb/db/query/reader/series/SeriesReader.java | 36 ++++++----- .../reader/series/SeriesReaderByTimestamp.java | 13 +--- .../reader/series/VectorSeriesAggregateReader.java | 2 +- .../db/query/reader/series/VectorSeriesReader.java | 65 ++++++++++++++++++++ .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 7 +-- 10 files changed, 207 insertions(+), 95 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java index 1de13a2..9a35b61 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java @@ -560,16 +560,7 @@ public class ClusterReaderFactory { QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter); valueFilter = queryDataSource.updateFilterUsingTTL(valueFilter); - return new SeriesReader( - path, - allSensors, - dataType, - context, - queryDataSource, - timeFilter, - valueFilter, - new SlotTsFileFilter(requiredSlots), - ascending); + return path.createSeriesReader(allSensors, dataType, context, queryDataSource, timeFilter, valueFilter, new SlotTsFileFilter(requiredSlots), ascending); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java index c052161..b4cf014 100755 --- a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java @@ -18,25 +18,31 @@ */ package org.apache.iotdb.db.metadata; +import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; +import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; import org.apache.iotdb.db.conf.IoTDBConstant; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.metadata.utils.MetaUtils; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.filter.TsFileFilter; +import org.apache.iotdb.db.query.reader.series.SeriesReader; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; - +import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.regex.Pattern; - -import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; -import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD; - /** * A prefix path, suffix path or fullPath generated from SQL. Usually used in the IoTDB server * module @@ -49,7 +55,9 @@ public class PartialPath extends Path implements Comparable<Path> { // alias of measurement, null pointer cannot be serialized in thrift so empty string is instead protected String measurementAlias = ""; - public PartialPath() {} + public PartialPath() { + } + /** * Construct the PartialPath using a String, will split the given String into String[] E.g., path * = "root.sg.\"d.1\".\"s.1\"" nodes = {"root", "sg", "\"d.1\"", "\"s.1\""} @@ -67,15 +75,17 @@ public class PartialPath extends Path implements Comparable<Path> { this.nodes = MetaUtils.splitPathToDetachedPath(fullPath); } - /** @param partialNodes nodes of a time series path */ + /** + * @param partialNodes nodes of a time series path + */ public PartialPath(String[] partialNodes) { nodes = partialNodes; } /** - * @param path path + * @param path path * @param needSplit needSplit is basically false, whether need to be split to device and - * measurement, doesn't support escape character yet. + * measurement, doesn't support escape character yet. */ public PartialPath(String path, boolean needSplit) { super(path, needSplit); @@ -358,4 +368,37 @@ public class PartialPath extends Path implements Comparable<Path> { public String getExactFullPath() { return getFullPath(); } + + public SeriesReader createSeriesReader(Set<String> allSensors, + TSDataType dataType, + QueryContext context, + QueryDataSource dataSource, + Filter timeFilter, + Filter valueFilter, + TsFileFilter fileFilter, + boolean ascending) { + return new SeriesReader( + this, + allSensors, + dataType, + context, + dataSource, + timeFilter, + valueFilter, + fileFilter, + ascending); + } + + @TestOnly + public SeriesReader createSeriesReader(Set<String> allSensors, + TSDataType dataType, + QueryContext context, + List<TsFileResource> seqFileResource, + List<TsFileResource> unseqFileResource, + Filter timeFilter, + Filter valueFilter, + boolean ascending) { + return new SeriesReader(this, allSensors, dataType, context, seqFileResource, unseqFileResource, + timeFilter, valueFilter, ascending); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java index 82b81a1..93fcd3d 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java @@ -19,12 +19,20 @@ package org.apache.iotdb.db.metadata; -import org.apache.iotdb.db.exception.metadata.IllegalPathException; -import org.apache.iotdb.tsfile.common.constant.TsFileConstant; - import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Set; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.filter.TsFileFilter; +import org.apache.iotdb.db.query.reader.series.VectorSeriesReader; +import org.apache.iotdb.db.utils.TestOnly; +import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; /** * VectorPartialPath represents a vector's fullPath. It not only contains the full path of vector's @@ -36,7 +44,8 @@ public class VectorPartialPath extends PartialPath { private List<String> subSensorsList; - public VectorPartialPath() {} + public VectorPartialPath() { + } public VectorPartialPath(String vectorPath, List<String> subSensorsList) throws IllegalPathException { @@ -119,4 +128,39 @@ public class VectorPartialPath extends PartialPath { } return fullPath; } + + @Override + public VectorSeriesReader createSeriesReader(Set<String> allSensors, + TSDataType dataType, + QueryContext context, + QueryDataSource dataSource, + Filter timeFilter, + Filter valueFilter, + TsFileFilter fileFilter, + boolean ascending) { + return new VectorSeriesReader( + this, + allSensors, + dataType, + context, + dataSource, + timeFilter, + valueFilter, + fileFilter, + ascending); + } + + @Override + @TestOnly + public VectorSeriesReader createSeriesReader(Set<String> allSensors, + TSDataType dataType, + QueryContext context, + List<TsFileResource> seqFileResource, + List<TsFileResource> unseqFileResource, + Filter timeFilter, + Filter valueFilter, + boolean ascending) { + return new VectorSeriesReader(this, allSensors, dataType, context, seqFileResource, + unseqFileResource, timeFilter, valueFilter, ascending); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java index 04cb907..91f161f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java @@ -44,17 +44,8 @@ public class SeriesAggregateReader implements IAggregateReader { Filter valueFilter, TsFileFilter fileFilter, boolean ascending) { - this.seriesReader = - new SeriesReader( - seriesPath, - allSensors, - dataType, - context, - dataSource, - timeFilter, - valueFilter, - fileFilter, - ascending); + this.seriesReader = seriesPath.createSeriesReader(allSensors, dataType, context, dataSource, + timeFilter, valueFilter, fileFilter, ascending); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java index 6af9c4e..c944ca4 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java @@ -57,17 +57,8 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader { Filter valueFilter, TsFileFilter fileFilter, boolean ascending) { - this.seriesReader = - new SeriesReader( - seriesPath, - allSensors, - dataType, - context, - dataSource, - timeFilter, - valueFilter, - fileFilter, - ascending); + this.seriesReader = seriesPath.createSeriesReader(allSensors, dataType, context, dataSource, + timeFilter, valueFilter, fileFilter, ascending); } @TestOnly @@ -83,17 +74,14 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader { boolean ascending) { Set<String> allSensors = new HashSet<>(); allSensors.add(seriesPath.getMeasurement()); - this.seriesReader = - new SeriesReader( - seriesPath, - allSensors, - dataType, - context, - seqFileResource, - unseqFileResource, - timeFilter, - valueFilter, - ascending); + this.seriesReader = seriesPath.createSeriesReader(allSensors, + dataType, + context, + seqFileResource, + unseqFileResource, + timeFilter, + valueFilter, + ascending); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java index c2af73e..16a508c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java @@ -18,8 +18,16 @@ */ package org.apache.iotdb.db.query.reader.series; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; +import java.io.IOException; +import java.io.Serializable; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.function.ToLongFunction; +import java.util.stream.Collectors; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.metadata.PartialPath; @@ -47,20 +55,8 @@ import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter; import org.apache.iotdb.tsfile.read.reader.IPageReader; import org.apache.iotdb.tsfile.read.reader.page.VectorPageReader; -import java.io.IOException; -import java.io.Serializable; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; -import java.util.Objects; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.function.ToLongFunction; -import java.util.stream.Collectors; - public class SeriesReader { - private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); // inner class of SeriesReader for order purpose protected TimeOrderUtils orderUtils; @@ -185,7 +181,7 @@ public class SeriesReader { @TestOnly @SuppressWarnings("squid:S107") - SeriesReader( + public SeriesReader( PartialPath seriesPath, Set<String> allSensors, TSDataType dataType, @@ -1028,7 +1024,7 @@ public class SeriesReader { protected void unpackUnseqTsFileResource() throws IOException { ITimeSeriesMetadata timeseriesMetadata = - FileLoaderUtils.loadTimeSeriesMetadata( + loadTimeSeriesMetadata( unseqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors); if (timeseriesMetadata != null) { timeseriesMetadata.setModified(true); @@ -1037,6 +1033,14 @@ public class SeriesReader { } } + protected ITimeSeriesMetadata loadTimeSeriesMetadata(TsFileResource resource, + PartialPath seriesPath, + QueryContext context, + Filter filter, + Set<String> allSensors) throws IOException { + return FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, filter, allSensors); + } + protected Filter getAnyFilter() { return timeFilter != null ? timeFilter : valueFilter; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java index 27796ff..bd68bed 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java @@ -47,17 +47,8 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp { boolean ascending) { UnaryFilter timeFilter = ascending ? TimeFilter.gtEq(Long.MIN_VALUE) : TimeFilter.ltEq(Long.MAX_VALUE); - this.seriesReader = - new SeriesReader( - seriesPath, - allSensors, - dataType, - context, - dataSource, - timeFilter, - null, - fileFilter, - ascending); + this.seriesReader = seriesPath.createSeriesReader(allSensors, dataType, context, dataSource, + timeFilter, null, fileFilter, ascending); this.ascending = ascending; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java index 23b1243..8db2bd2 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java @@ -53,7 +53,7 @@ public class VectorSeriesAggregateReader implements IAggregateReader { TsFileFilter fileFilter, boolean ascending) { this.seriesReader = - new SeriesReader( + new VectorSeriesReader( seriesPath, allSensors, dataType, diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java new file mode 100644 index 0000000..db47258 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.query.reader.series; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.VectorPartialPath; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.filter.TsFileFilter; +import org.apache.iotdb.db.utils.FileLoaderUtils; +import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +public class VectorSeriesReader extends SeriesReader { + + public VectorSeriesReader(PartialPath seriesPath, + Set<String> allSensors, + TSDataType dataType, + QueryContext context, + QueryDataSource dataSource, + Filter timeFilter, + Filter valueFilter, + TsFileFilter fileFilter, boolean ascending) { + super(seriesPath, allSensors, dataType, context, dataSource, timeFilter, valueFilter, + fileFilter, + ascending); + } + + public VectorSeriesReader(PartialPath seriesPath, Set<String> allSensors, + TSDataType dataType, QueryContext context, + List<TsFileResource> seqFileResource, + List<TsFileResource> unseqFileResource, + Filter timeFilter, Filter valueFilter, boolean ascending) { + super(seriesPath, allSensors, dataType, context, seqFileResource, unseqFileResource, timeFilter, + valueFilter, ascending); + } + + @Override + protected ITimeSeriesMetadata loadTimeSeriesMetadata(TsFileResource resource, + PartialPath seriesPath, QueryContext context, Filter filter, Set<String> allSensors) + throws IOException { + return FileLoaderUtils.loadTimeSeriesMetadata(resource, (VectorPartialPath) seriesPath, context, filter, allSensors); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index 3d2931b..9626991 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java @@ -99,11 +99,6 @@ public class FileLoaderUtils { Filter filter, Set<String> allSensors) throws IOException { - // deal with vector - if (seriesPath instanceof VectorPartialPath) { - return loadVectorTimeSeriesMetadata( - resource, (VectorPartialPath) seriesPath, context, filter, allSensors); - } // common path ITimeSeriesMetadata timeSeriesMetadata; @@ -159,7 +154,7 @@ public class FileLoaderUtils { * [root.sg1.d1.vector.s1, root.sg1.d1.vector.s2]) * @param allSensors all sensors belonging to this device that appear in query */ - private static VectorTimeSeriesMetadata loadVectorTimeSeriesMetadata( + public static VectorTimeSeriesMetadata loadTimeSeriesMetadata( TsFileResource resource, VectorPartialPath vectorPath, QueryContext context,
