This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch AlignedQueryWithValueFilter in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e19a361100a186dadcbcb421536d6ae01ed729d9 Author: JackieTien97 <[email protected]> AuthorDate: Mon Jan 10 15:53:58 2022 +0800 Optimize raw query with value filter for aligned paths --- .../cluster/query/ClusterUDTFQueryExecutor.java | 21 ++++-- .../apache/iotdb/db/metadata/path/AlignedPath.java | 17 +++++ .../dataset/RawQueryDataSetWithValueFilter.java | 52 ++++++++++++--- .../db/query/dataset/UDTFAlignByTimeDataSet.java | 2 + .../apache/iotdb/db/query/dataset/UDTFDataSet.java | 2 + .../db/query/dataset/UDTFNonAlignDataSet.java | 2 + .../iotdb/db/query/executor/QueryRouter.java | 10 ++- .../db/query/executor/RawDataQueryExecutor.java | 77 +++++++++++++++++++--- .../iotdb/db/query/executor/UDFQueryExecutor.java | 21 ++++-- .../query/udf/core/layer/RawQueryInputLayer.java | 4 +- .../apache/iotdb/tsfile/read/common/RowRecord.java | 18 +++++ 11 files changed, 197 insertions(+), 29 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java index 0da64d7..8a32d38 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.query; import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.path.MeasurementPath; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.dataset.UDTFAlignByTimeDataSet; @@ -30,10 +31,12 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; +import org.apache.iotdb.tsfile.utils.Pair; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths; @@ -56,16 +59,21 @@ public class ClusterUDTFQueryExecutor extends ClusterDataQueryExecutor { public QueryDataSet executeWithValueFilterAlignByTime(QueryContext context) throws StorageEngineException, QueryProcessException, IOException { + // transfer to MeasurementPath to AlignedPath if it's under an aligned entity + queryPlan.setDeduplicatedPaths( + queryPlan.getDeduplicatedPaths().stream() + .map(p -> ((MeasurementPath) p).transformToExactPath()) + .collect(Collectors.toList())); TimeGenerator timestampGenerator = getTimeGenerator(context, udtfPlan); List<Boolean> cached = markFilterdPaths( udtfPlan.getExpression(), new ArrayList<>(udtfPlan.getDeduplicatedPaths()), timestampGenerator.hasOrNode()); - List<IReaderByTimestamp> readersOfSelectedSeries = + Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair = initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter()); return new UDTFAlignByTimeDataSet( - context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached); + context, udtfPlan, timestampGenerator, pair.left, pair.right, cached); } public QueryDataSet executeWithoutValueFilterNonAlign(QueryContext context) @@ -76,15 +84,20 @@ public class ClusterUDTFQueryExecutor extends ClusterDataQueryExecutor { public QueryDataSet executeWithValueFilterNonAlign(QueryContext context) throws QueryProcessException, StorageEngineException, IOException { + // transfer to MeasurementPath to AlignedPath if it's under an aligned entity + queryPlan.setDeduplicatedPaths( + queryPlan.getDeduplicatedPaths().stream() + .map(p -> ((MeasurementPath) p).transformToExactPath()) + .collect(Collectors.toList())); TimeGenerator timestampGenerator = getTimeGenerator(context, udtfPlan); List<Boolean> cached = markFilterdPaths( udtfPlan.getExpression(), new ArrayList<>(udtfPlan.getDeduplicatedPaths()), timestampGenerator.hasOrNode()); - List<IReaderByTimestamp> readersOfSelectedSeries = + Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair = initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter()); return new UDTFNonAlignDataSet( - context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached); + context, udtfPlan, timestampGenerator, pair.left, pair.right, cached); } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java index c5078ef..b585559 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java @@ -170,6 +170,23 @@ public class AlignedPath extends PartialPath { schemaList.add(measurementPath.getMeasurementSchema()); } + /** + * merge another aligned path's sub sensors into this one + * + * @param alignedPath The caller need to ensure the alignedPath must have same device as this one + * and these two doesn't have same sub sensor + */ + public void mergeAlignedPath(AlignedPath alignedPath) { + if (measurementList == null) { + measurementList = new ArrayList<>(); + } + measurementList.addAll(alignedPath.measurementList); + if (schemaList == null) { + schemaList = new ArrayList<>(); + } + schemaList.addAll(alignedPath.schemaList); + } + public List<IMeasurementSchema> getSchemaList() { return this.schemaList == null ? Collections.emptyList() : this.schemaList; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java index 630e45b..0270325 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java @@ -34,12 +34,17 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF private final TimeGenerator timeGenerator; private final List<IReaderByTimestamp> seriesReaderByTimestampList; + // reader -> index list in Result RowRecord + // if the reader is an aligned sensor's reader, the corresponding index list will contain more + // than one + private final List<List<Integer>> readerToIndexList; + private final List<Boolean> cached; private final List<RowRecord> cachedRowRecords = new ArrayList<>(); /** Used for UDF. */ - private List<Object[]> cachedRowInObjects = new ArrayList<>(); + private final List<Object[]> cachedRowInObjects = new ArrayList<>(); /** * constructor of EngineDataSetWithValueFilter. @@ -56,11 +61,13 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF List<TSDataType> dataTypes, TimeGenerator timeGenerator, List<IReaderByTimestamp> readers, + List<List<Integer>> readerToIndexList, List<Boolean> cached, boolean ascending) { super(new ArrayList<>(paths), dataTypes, ascending); this.timeGenerator = timeGenerator; this.seriesReaderByTimestampList = readers; + this.readerToIndexList = readerToIndexList; this.cached = cached; } @@ -100,6 +107,9 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF RowRecord[] rowRecords = new RowRecord[cachedTimeCnt]; for (int i = 0; i < cachedTimeCnt; i++) { rowRecords[i] = new RowRecord(cachedTimeArray[i]); + for (int columnIndex = 0; columnIndex < columnNum; columnIndex++) { + rowRecords[i].addField(null); + } } boolean[] hasField = new boolean[cachedTimeCnt]; @@ -119,21 +129,25 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF // 3. use values in results to fill row record for (int j = 0; j < cachedTimeCnt; j++) { if (results == null || results[j] == null) { - rowRecords[j].addField(null); + for (int index : readerToIndexList.get(i)) { + rowRecords[j].setField(null, index); + } } else { if (dataTypes.get(i) == TSDataType.VECTOR) { TsPrimitiveType[] result = (TsPrimitiveType[]) results[j]; - for (TsPrimitiveType value : result) { + for (int k = 0; k < result.length; k++) { + TsPrimitiveType value = result[k]; + int index = readerToIndexList.get(i).get(k); if (value == null) { - rowRecords[j].addField(null); + rowRecords[j].setField(null, index); } else { hasField[j] = true; - rowRecords[j].addField(value.getValue(), value.getDataType()); + rowRecords[j].setField(value.getValue(), value.getDataType(), index); } } } else { hasField[j] = true; - rowRecords[j].addField(results[j], dataTypes.get(i)); + rowRecords[j].setField(results[j], dataTypes.get(i), readerToIndexList.get(i).get(0)); } } } @@ -185,9 +199,9 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF return false; } - Object[][] rowsInObject = new Object[cachedTimeCnt][seriesReaderByTimestampList.size() + 1]; + Object[][] rowsInObject = new Object[cachedTimeCnt][columnNum + 1]; for (int i = 0; i < cachedTimeCnt; i++) { - rowsInObject[i][seriesReaderByTimestampList.size()] = cachedTimeArray[i]; + rowsInObject[i][columnNum] = cachedTimeArray[i]; } boolean[] hasField = new boolean[cachedTimeCnt]; @@ -207,7 +221,29 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements IUDF // 3. use values in results to fill row record for (int j = 0; j < cachedTimeCnt; j++) { if (results != null && results[j] != null) { + + if (dataTypes.get(i) == TSDataType.VECTOR) { + TsPrimitiveType[] result = (TsPrimitiveType[]) results[j]; + for (int k = 0; k < result.length; k++) { + TsPrimitiveType value = result[k]; + int index = readerToIndexList.get(i).get(k); + if (value == null) { + rowsInObject[j][index] = null; + } else { + hasField[j] = true; + rowsInObject[j][index] = value.getValue(); + } + } + } else { + hasField[j] = true; + + rowsInObject[j][readerToIndexList.get(i).get(0)] = results[j]; + } + hasField[j] = true; + for (int index : readerToIndexList.get(i)) { + rowsInObject[j][index] = results[j]; + } rowsInObject[j][i] = results[j]; } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java index 7a6b37d..c750f55 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java @@ -52,6 +52,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy UDTFPlan udtfPlan, TimeGenerator timestampGenerator, List<IReaderByTimestamp> readersOfSelectedSeries, + List<List<Integer>> readerToIndexList, List<Boolean> cached) throws IOException, QueryProcessException { super( @@ -61,6 +62,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy udtfPlan.getDeduplicatedDataTypes(), timestampGenerator, readersOfSelectedSeries, + readerToIndexList, cached); keepNull = false; initTimeHeap(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java index 52ca43f..1fe27e7 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java @@ -62,6 +62,7 @@ public abstract class UDTFDataSet extends QueryDataSet { List<TSDataType> deduplicatedDataTypes, TimeGenerator timestampGenerator, List<IReaderByTimestamp> readersOfSelectedSeries, + List<List<Integer>> readerToIndexList, List<Boolean> cached) throws QueryProcessException, IOException { super(new ArrayList<>(deduplicatedPaths), deduplicatedDataTypes); @@ -75,6 +76,7 @@ public abstract class UDTFDataSet extends QueryDataSet { deduplicatedDataTypes, timestampGenerator, readersOfSelectedSeries, + readerToIndexList, cached); initTransformers(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java index afd1824..01af59c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java @@ -56,6 +56,7 @@ public class UDTFNonAlignDataSet extends UDTFDataSet implements DirectNonAlignDa UDTFPlan udtfPlan, TimeGenerator timestampGenerator, List<IReaderByTimestamp> readersOfSelectedSeries, + List<List<Integer>> readerToIndexList, List<Boolean> cached) throws IOException, QueryProcessException { super( @@ -65,6 +66,7 @@ public class UDTFNonAlignDataSet extends UDTFDataSet implements DirectNonAlignDa udtfPlan.getDeduplicatedDataTypes(), timestampGenerator, readersOfSelectedSeries, + readerToIndexList, cached); isInitialized = false; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java index 64c5fe6..b89e316 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java @@ -86,12 +86,12 @@ public class QueryRouter implements IQueryRouter { } queryPlan.setExpression(optimizedExpression); - // group the vector partial paths for raw query after optimize the expression - // because path in expressions should not be grouped - queryPlan.transformToVector(); RawDataQueryExecutor rawDataQueryExecutor = getRawDataQueryExecutor(queryPlan); if (!queryPlan.isAlignByTime()) { + // group the vector partial paths for raw query after optimize the expression + // because path in expressions should not be grouped + queryPlan.transformToVector(); return rawDataQueryExecutor.executeNonAlign(context); } @@ -107,6 +107,10 @@ public class QueryRouter implements IQueryRouter { return new EmptyDataSet(); } } + + // group the vector partial paths for raw query after optimize the expression + // because path in expressions should not be grouped + queryPlan.transformToVector(); return rawDataQueryExecutor.executeWithoutValueFilter(context); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java index 9923570..c1f101e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java @@ -23,6 +23,8 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.path.AlignedPath; +import org.apache.iotdb.db.metadata.path.MeasurementPath; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.context.QueryContext; @@ -46,10 +48,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; +import java.util.stream.Collectors; import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths; @@ -162,31 +162,90 @@ public class RawDataQueryExecutor { return dataSet; } + // transfer to MeasurementPath to AlignedPath if it's under an aligned entity + queryPlan.setDeduplicatedPaths( + queryPlan.getDeduplicatedPaths().stream() + .map(p -> ((MeasurementPath) p).transformToExactPath()) + .collect(Collectors.toList())); + TimeGenerator timestampGenerator = getTimeGenerator(context, queryPlan); List<Boolean> cached = markFilterdPaths( queryPlan.getExpression(), new ArrayList<>(queryPlan.getDeduplicatedPaths()), timestampGenerator.hasOrNode()); - List<IReaderByTimestamp> readersOfSelectedSeries = + Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair = initSeriesReaderByTimestamp(context, queryPlan, cached, timestampGenerator.getTimeFilter()); + return new RawQueryDataSetWithValueFilter( queryPlan.getDeduplicatedPaths(), queryPlan.getDeduplicatedDataTypes(), timestampGenerator, - readersOfSelectedSeries, + pair.left, + pair.right, cached, queryPlan.isAscending()); } - protected List<IReaderByTimestamp> initSeriesReaderByTimestamp( + /** + * init IReaderByTimestamp for each not cached PartialPath, if it's already been cached, the + * corresponding IReaderByTimestamp will be null group these not cached PartialPath to one + * AlignedPath if they belong to same aligned device + * + * @return List<IReaderByTimestamp> if it's already been cached, the corresponding + * IReaderByTimestamp will be null List<List<Integer>> IReaderByTimestamp's corresponding + * index list to the result RowRecord. + */ + protected Pair<List<IReaderByTimestamp>, List<List<Integer>>> initSeriesReaderByTimestamp( QueryContext context, RawDataQueryPlan queryPlan, List<Boolean> cached, Filter timeFilter) throws QueryProcessException, StorageEngineException { List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>(); + List<PartialPath> pathList = new ArrayList<>(); + List<PartialPath> notCachedPathList = new ArrayList<>(); + + // reader index -> deduplicated path index + List<List<Integer>> readerToIndexList = new ArrayList<>(); + // fullPath -> reader index + Map<String, Integer> fullPathToReaderIndexMap = new HashMap<>(); + List<PartialPath> deduplicatedPaths = queryPlan.getDeduplicatedPaths(); + int index = 0; + for (int i = 0; i < cached.size(); i++) { + if (cached.get(i)) { + pathList.add(deduplicatedPaths.get(i)); + readerToIndexList.add(Collections.singletonList(i)); + cached.set(index++, Boolean.TRUE); + } else { + notCachedPathList.add(deduplicatedPaths.get(i)); + // For aligned Path, it's deviceID; for nonAligned path, it's full path + String fullPath = deduplicatedPaths.get(i).getFullPath(); + Integer readerIndex = fullPathToReaderIndexMap.get(fullPath); + + // it's another sub sensor in aligned device, we just add it to the previous AlignedPath + if (readerIndex != null) { + AlignedPath anotherSubSensor = (AlignedPath) deduplicatedPaths.get(i); + ((AlignedPath) pathList.get(readerIndex)).mergeAlignedPath(anotherSubSensor); + readerToIndexList.get(readerIndex).add(i); + } else { + pathList.add(deduplicatedPaths.get(i)); + fullPathToReaderIndexMap.put(fullPath, index); + List<Integer> indexList = new ArrayList<>(); + indexList.add(i); + readerToIndexList.add(indexList); + cached.set(index++, Boolean.FALSE); + } + } + } + + queryPlan.setDeduplicatedPaths(pathList); + int previousSize = cached.size(); + if (previousSize > pathList.size()) { + cached.subList(pathList.size(), previousSize).clear(); + } + Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>> lockListAndProcessorToSeriesMapPair = - StorageEngine.getInstance().mergeLock(queryPlan.getDeduplicatedPaths()); + StorageEngine.getInstance().mergeLock(notCachedPathList); List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left; Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap = lockListAndProcessorToSeriesMapPair.right; @@ -213,7 +272,7 @@ public class RawDataQueryExecutor { } finally { StorageEngine.getInstance().mergeUnLock(lockList); } - return readersOfSelectedSeries; + return new Pair<>(readersOfSelectedSeries, readerToIndexList); } protected IReaderByTimestamp getReaderByTimestamp( diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java index 0b8299e..1a8f72c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/UDFQueryExecutor.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.executor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.path.MeasurementPath; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.dataset.UDFInputDataSet; @@ -31,10 +32,12 @@ import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; +import org.apache.iotdb.tsfile.utils.Pair; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths; @@ -55,16 +58,21 @@ public class UDFQueryExecutor extends RawDataQueryExecutor { public QueryDataSet executeWithValueFilterAlignByTime(QueryContext context) throws StorageEngineException, QueryProcessException, IOException { + // transfer to MeasurementPath to AlignedPath if it's under an aligned entity + queryPlan.setDeduplicatedPaths( + queryPlan.getDeduplicatedPaths().stream() + .map(p -> ((MeasurementPath) p).transformToExactPath()) + .collect(Collectors.toList())); TimeGenerator timestampGenerator = getTimeGenerator(context, udtfPlan); List<Boolean> cached = markFilterdPaths( udtfPlan.getExpression(), new ArrayList<>(udtfPlan.getDeduplicatedPaths()), timestampGenerator.hasOrNode()); - List<IReaderByTimestamp> readersOfSelectedSeries = + Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair = initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter()); return new UDTFAlignByTimeDataSet( - context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached); + context, udtfPlan, timestampGenerator, pair.left, pair.right, cached); } public QueryDataSet executeWithoutValueFilterNonAlign(QueryContext context) @@ -75,16 +83,21 @@ public class UDFQueryExecutor extends RawDataQueryExecutor { public QueryDataSet executeWithValueFilterNonAlign(QueryContext context) throws QueryProcessException, StorageEngineException, IOException { + // transfer to MeasurementPath to AlignedPath if it's under an aligned entity + queryPlan.setDeduplicatedPaths( + queryPlan.getDeduplicatedPaths().stream() + .map(p -> ((MeasurementPath) p).transformToExactPath()) + .collect(Collectors.toList())); TimeGenerator timestampGenerator = getTimeGenerator(context, udtfPlan); List<Boolean> cached = markFilterdPaths( udtfPlan.getExpression(), new ArrayList<>(udtfPlan.getDeduplicatedPaths()), timestampGenerator.hasOrNode()); - List<IReaderByTimestamp> readersOfSelectedSeries = + Pair<List<IReaderByTimestamp>, List<List<Integer>>> pair = initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter()); return new UDTFNonAlignDataSet( - context, udtfPlan, timestampGenerator, readersOfSelectedSeries, cached); + context, udtfPlan, timestampGenerator, pair.left, pair.right, cached); } public final QueryDataSet executeFromAlignedDataSet( diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java index 9fcbb61..de96eb2 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java @@ -64,12 +64,14 @@ public class RawQueryInputLayer { List<TSDataType> dataTypes, TimeGenerator timeGenerator, List<IReaderByTimestamp> readers, + List<List<Integer>> readerToIndexList, List<Boolean> cached) throws QueryProcessException { construct( queryId, memoryBudgetInMB, - new RawQueryDataSetWithValueFilter(paths, dataTypes, timeGenerator, readers, cached, true)); + new RawQueryDataSetWithValueFilter( + paths, dataTypes, timeGenerator, readers, readerToIndexList, cached, true)); } public RawQueryInputLayer(long queryId, float memoryBudgetInMB, IUDFInputDataSet queryDataSet) diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/RowRecord.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/RowRecord.java index 748e8ab..5b98ca5 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/RowRecord.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/RowRecord.java @@ -59,6 +59,15 @@ public class RowRecord { } } + public void setField(Field f, int index) { + this.fields.set(index, f); + if (f == null || f.getDataType() == null) { + hasNullField = true; + } else { + allNull = false; + } + } + public void addField(Object value, TSDataType dataType) { this.fields.add(Field.getField(value, dataType)); if (value == null || dataType == null) { @@ -68,6 +77,15 @@ public class RowRecord { } } + public void setField(Object value, TSDataType dataType, int index) { + this.fields.set(index, Field.getField(value, dataType)); + if (value == null || dataType == null) { + hasNullField = true; + } else { + allNull = false; + } + } + @Override public String toString() { StringBuilder sb = new StringBuilder();
