This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch to0.12pushdownlimit
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 729b616ff993ef690ad8cb042c26e5ba3d6a4e87
Author: Alima777 <wxw19981...@gmail.com>
AuthorDate: Fri Dec 17 11:24:11 2021 +0800

    push down limit to read task in rawdatasetwithoutValueFilter
---
 .../cluster/query/ClusterDataQueryExecutor.java    |  6 +----
 .../dataset/RawQueryDataSetWithoutValueFilter.java | 26 ++++++++++++----------
 .../UDFRawQueryInputDataSetWithoutValueFilter.java | 11 +++------
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java |  7 +-----
 .../db/query/executor/RawDataQueryExecutor.java    |  6 +----
 .../iotdb/db/query/udf/core/input/InputLayer.java  |  9 +++-----
 6 files changed, 23 insertions(+), 42 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index 92f100a..3550bbf 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -85,11 +85,7 @@ public class ClusterDataQueryExecutor extends 
RawDataQueryExecutor {
     try {
       List<ManagedSeriesReader> readersOfSelectedSeries = 
initMultSeriesReader(context);
       return new RawQueryDataSetWithoutValueFilter(
-          context.getQueryId(),
-          queryPlan.getDeduplicatedPaths(),
-          queryPlan.getDeduplicatedDataTypes(),
-          readersOfSelectedSeries,
-          queryPlan.isAscending());
+          context.getQueryId(), queryPlan, readersOfSelectedSeries);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new StorageEngineException(e.getMessage());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index a6c928e..a5f613d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.query.dataset;
 
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
-import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
 import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
@@ -177,18 +177,23 @@ public class RawQueryDataSetWithoutValueFilter extends 
QueryDataSet
   /**
    * constructor of EngineDataSetWithoutValueFilter.
    *
-   * @param paths paths in List structure
-   * @param dataTypes time series data type
    * @param readers readers in List(IPointReader) structure
    */
   public RawQueryDataSetWithoutValueFilter(
-      long queryId,
-      List<PartialPath> paths,
-      List<TSDataType> dataTypes,
-      List<ManagedSeriesReader> readers,
-      boolean ascending)
+      long queryId, RawDataQueryPlan queryPlan, List<ManagedSeriesReader> 
readers)
       throws IOException, InterruptedException {
-    super(new ArrayList<>(paths), dataTypes, ascending);
+    super(
+        new ArrayList<>(queryPlan.getDeduplicatedPaths()),
+        queryPlan.getDeduplicatedDataTypes(),
+        queryPlan.isAscending());
+    this.rowLimit = queryPlan.getRowLimit();
+    this.rowOffset = queryPlan.getRowOffset();
+    this.withoutAnyNull = queryPlan.isWithoutAnyNull();
+    this.withoutAllNull = queryPlan.isWithoutAllNull();
+    if (rowLimit != 0 && !withoutAllNull && !withoutAnyNull) {
+      batchDataLengthList = new int[readers.size()];
+    }
+
     this.queryId = queryId;
     this.seriesReaderList = readers;
     blockingQueueArray = new BlockingQueue[readers.size()];
@@ -197,9 +202,6 @@ public class RawQueryDataSetWithoutValueFilter extends 
QueryDataSet
     }
     cachedBatchDataArray = new BatchData[readers.size()];
     noMoreDataInQueueArray = new boolean[readers.size()];
-    if (rowLimit != 0) {
-      batchDataLengthList = new int[readers.size()];
-    }
     init();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDFRawQueryInputDataSetWithoutValueFilter.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDFRawQueryInputDataSetWithoutValueFilter.java
index e7c4917..7c43df4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDFRawQueryInputDataSetWithoutValueFilter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDFRawQueryInputDataSetWithoutValueFilter.java
@@ -19,9 +19,8 @@
 
 package org.apache.iotdb.db.query.dataset;
 
-import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.io.IOException;
 import java.util.List;
@@ -30,13 +29,9 @@ public class UDFRawQueryInputDataSetWithoutValueFilter 
extends RawQueryDataSetWi
     implements UDFInputDataSet {
 
   public UDFRawQueryInputDataSetWithoutValueFilter(
-      long queryId,
-      List<PartialPath> paths,
-      List<TSDataType> dataTypes,
-      List<ManagedSeriesReader> readers,
-      boolean ascending)
+      long queryId, UDTFPlan udtfPlan, List<ManagedSeriesReader> readers)
       throws IOException, InterruptedException {
-    super(queryId, paths, dataTypes, readers, ascending);
+    super(queryId, udtfPlan, readers);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 5761ef2..6dfbe7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -95,12 +95,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
     queryId = queryContext.getQueryId();
     this.udtfPlan = udtfPlan;
     inputLayer =
-        new InputLayer(
-            queryId,
-            UDF_READER_MEMORY_BUDGET_IN_MB,
-            deduplicatedPaths,
-            deduplicatedDataTypes,
-            readersOfSelectedSeries);
+        new InputLayer(queryId, UDF_READER_MEMORY_BUDGET_IN_MB, udtfPlan, 
readersOfSelectedSeries);
     udtfPlan.initializeUdfExecutors(queryId, 
UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
     initTransformers(UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index ac13c1e..d480d97 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -67,11 +67,7 @@ public class RawDataQueryExecutor {
     List<ManagedSeriesReader> readersOfSelectedSeries = 
initManagedSeriesReader(context);
     try {
       return new RawQueryDataSetWithoutValueFilter(
-          context.getQueryId(),
-          queryPlan.getDeduplicatedPaths(),
-          queryPlan.getDeduplicatedDataTypes(),
-          readersOfSelectedSeries,
-          queryPlan.isAscending());
+          context.getQueryId(), queryPlan, readersOfSelectedSeries);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new StorageEngineException(e.getMessage());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/input/InputLayer.java 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/input/InputLayer.java
index f8dc99a..46eae97 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/input/InputLayer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/input/InputLayer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.udf.core.input;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithValueFilter;
 import org.apache.iotdb.db.query.dataset.UDFInputDataSet;
 import 
org.apache.iotdb.db.query.dataset.UDFRawQueryInputDataSetWithoutValueFilter;
@@ -62,16 +63,12 @@ public class InputLayer {
 
   /** InputLayerWithoutValueFilter */
   public InputLayer(
-      long queryId,
-      float memoryBudgetInMB,
-      List<PartialPath> paths,
-      List<TSDataType> dataTypes,
-      List<ManagedSeriesReader> readers)
+      long queryId, float memoryBudgetInMB, UDTFPlan udtfPlan, 
List<ManagedSeriesReader> readers)
       throws QueryProcessException, IOException, InterruptedException {
     constructInputLayer(
         queryId,
         memoryBudgetInMB,
-        new UDFRawQueryInputDataSetWithoutValueFilter(queryId, paths, 
dataTypes, readers, true));
+        new UDFRawQueryInputDataSetWithoutValueFilter(queryId, udtfPlan, 
readers));
   }
 
   /** InputLayerWithValueFilter */

Reply via email to