This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch to0.12pushdownlimit in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 729b616ff993ef690ad8cb042c26e5ba3d6a4e87 Author: Alima777 <wxw19981...@gmail.com> AuthorDate: Fri Dec 17 11:24:11 2021 +0800 push down limit to read task in rawdatasetwithoutValueFilter --- .../cluster/query/ClusterDataQueryExecutor.java | 6 +---- .../dataset/RawQueryDataSetWithoutValueFilter.java | 26 ++++++++++++---------- .../UDFRawQueryInputDataSetWithoutValueFilter.java | 11 +++------ .../apache/iotdb/db/query/dataset/UDTFDataSet.java | 7 +----- .../db/query/executor/RawDataQueryExecutor.java | 6 +---- .../iotdb/db/query/udf/core/input/InputLayer.java | 9 +++----- 6 files changed, 23 insertions(+), 42 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java index 92f100a..3550bbf 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java @@ -85,11 +85,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor { try { List<ManagedSeriesReader> readersOfSelectedSeries = initMultSeriesReader(context); return new RawQueryDataSetWithoutValueFilter( - context.getQueryId(), - queryPlan.getDeduplicatedPaths(), - queryPlan.getDeduplicatedDataTypes(), - readersOfSelectedSeries, - queryPlan.isAscending()); + context.getQueryId(), queryPlan, readersOfSelectedSeries); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new StorageEngineException(e.getMessage()); diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java index a6c928e..a5f613d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.dataset; import org.apache.iotdb.db.concurrent.WrappedRunnable; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.control.QueryTimeManager; import org.apache.iotdb.db.query.pool.QueryTaskPoolManager; import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; @@ -177,18 +177,23 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet /** * constructor of EngineDataSetWithoutValueFilter. * - * @param paths paths in List structure - * @param dataTypes time series data type * @param readers readers in List(IPointReader) structure */ public RawQueryDataSetWithoutValueFilter( - long queryId, - List<PartialPath> paths, - List<TSDataType> dataTypes, - List<ManagedSeriesReader> readers, - boolean ascending) + long queryId, RawDataQueryPlan queryPlan, List<ManagedSeriesReader> readers) throws IOException, InterruptedException { - super(new ArrayList<>(paths), dataTypes, ascending); + super( + new ArrayList<>(queryPlan.getDeduplicatedPaths()), + queryPlan.getDeduplicatedDataTypes(), + queryPlan.isAscending()); + this.rowLimit = queryPlan.getRowLimit(); + this.rowOffset = queryPlan.getRowOffset(); + this.withoutAnyNull = queryPlan.isWithoutAnyNull(); + this.withoutAllNull = queryPlan.isWithoutAllNull(); + if (rowLimit != 0 && !withoutAllNull && !withoutAnyNull) { + batchDataLengthList = new int[readers.size()]; + } + this.queryId = queryId; this.seriesReaderList = readers; blockingQueueArray = new BlockingQueue[readers.size()]; @@ -197,9 +202,6 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet } cachedBatchDataArray = new BatchData[readers.size()]; noMoreDataInQueueArray = new boolean[readers.size()]; - if (rowLimit != 0) { - batchDataLengthList = new int[readers.size()]; - } init(); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDFRawQueryInputDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDFRawQueryInputDataSetWithoutValueFilter.java index e7c4917..7c43df4 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDFRawQueryInputDataSetWithoutValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDFRawQueryInputDataSetWithoutValueFilter.java @@ -19,9 +19,8 @@ package org.apache.iotdb.db.query.dataset; -import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import java.io.IOException; import java.util.List; @@ -30,13 +29,9 @@ public class UDFRawQueryInputDataSetWithoutValueFilter extends RawQueryDataSetWi implements UDFInputDataSet { public UDFRawQueryInputDataSetWithoutValueFilter( - long queryId, - List<PartialPath> paths, - List<TSDataType> dataTypes, - List<ManagedSeriesReader> readers, - boolean ascending) + long queryId, UDTFPlan udtfPlan, List<ManagedSeriesReader> readers) throws IOException, InterruptedException { - super(queryId, paths, dataTypes, readers, ascending); + super(queryId, udtfPlan, readers); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java index 5761ef2..6dfbe7b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java @@ -95,12 +95,7 @@ public abstract class UDTFDataSet extends QueryDataSet { queryId = queryContext.getQueryId(); this.udtfPlan = udtfPlan; inputLayer = - new InputLayer( - queryId, - UDF_READER_MEMORY_BUDGET_IN_MB, - deduplicatedPaths, - deduplicatedDataTypes, - readersOfSelectedSeries); + new InputLayer(queryId, UDF_READER_MEMORY_BUDGET_IN_MB, udtfPlan, readersOfSelectedSeries); udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB); initTransformers(UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java index ac13c1e..d480d97 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java @@ -67,11 +67,7 @@ public class RawDataQueryExecutor { List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context); try { return new RawQueryDataSetWithoutValueFilter( - context.getQueryId(), - queryPlan.getDeduplicatedPaths(), - queryPlan.getDeduplicatedDataTypes(), - readersOfSelectedSeries, - queryPlan.isAscending()); + context.getQueryId(), queryPlan, readersOfSelectedSeries); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new StorageEngineException(e.getMessage()); diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/input/InputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/input/InputLayer.java index f8dc99a..46eae97 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/input/InputLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/input/InputLayer.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.udf.core.input; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithValueFilter; import org.apache.iotdb.db.query.dataset.UDFInputDataSet; import org.apache.iotdb.db.query.dataset.UDFRawQueryInputDataSetWithoutValueFilter; @@ -62,16 +63,12 @@ public class InputLayer { /** InputLayerWithoutValueFilter */ public InputLayer( - long queryId, - float memoryBudgetInMB, - List<PartialPath> paths, - List<TSDataType> dataTypes, - List<ManagedSeriesReader> readers) + long queryId, float memoryBudgetInMB, UDTFPlan udtfPlan, List<ManagedSeriesReader> readers) throws QueryProcessException, IOException, InterruptedException { constructInputLayer( queryId, memoryBudgetInMB, - new UDFRawQueryInputDataSetWithoutValueFilter(queryId, paths, dataTypes, readers, true)); + new UDFRawQueryInputDataSetWithoutValueFilter(queryId, udtfPlan, readers)); } /** InputLayerWithValueFilter */