This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch 0.12pushdownlimit
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ec578a5a9c1bd5bb3e55a6f58e7ef9ffc592d529
Author: Alima777 <[email protected]>
AuthorDate: Wed Dec 8 09:56:35 2021 +0800

    push down limit to read task in rawdatasetwithoutValueFilter
---
 .../dataset/RawQueryDataSetWithoutValueFilter.java | 40 ++++++++++++++++++++--
 1 file changed, 37 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 7640b08..a8d7e79 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -56,12 +56,23 @@ public class RawQueryDataSetWithoutValueFilter extends 
QueryDataSet
     private final ManagedSeriesReader reader;
     private final String pathName;
     private final BlockingQueue<BatchData> blockingQueue;
+    private int[] batchDataLengthList;
+    private final int seriesIndex;
+    private final int fetchLimit;
 
     public ReadTask(
-        ManagedSeriesReader reader, BlockingQueue<BatchData> blockingQueue, 
String pathName) {
+        ManagedSeriesReader reader,
+        BlockingQueue<BatchData> blockingQueue,
+        String pathName,
+        int[] batchDataLengthList,
+        int seriesIndex,
+        int fetchLimit) {
       this.reader = reader;
       this.blockingQueue = blockingQueue;
       this.pathName = pathName;
+      this.batchDataLengthList = batchDataLengthList;
+      this.seriesIndex = seriesIndex;
+      this.fetchLimit = fetchLimit;
     }
 
     @Override
@@ -81,6 +92,13 @@ public class RawQueryDataSetWithoutValueFilter extends 
QueryDataSet
               continue;
             }
             blockingQueue.put(batchData);
+
+            if (batchDataLengthList != null) {
+              batchDataLengthList[seriesIndex] += batchData.length();
+              if (batchDataLengthList[seriesIndex] >= fetchLimit) {
+                break;
+              }
+            }
             // if the queue also has free space, just submit another itself
             if (blockingQueue.remainingCapacity() > 0) {
               TASK_POOL_MANAGER.submit(this);
@@ -144,6 +162,8 @@ public class RawQueryDataSetWithoutValueFilter extends 
QueryDataSet
 
   protected BatchData[] cachedBatchDataArray;
 
+  protected int[] batchDataLengthList;
+
   // capacity for blocking queue
   private static final int BLOCKING_QUEUE_CAPACITY = 5;
 
@@ -177,6 +197,9 @@ public class RawQueryDataSetWithoutValueFilter extends 
QueryDataSet
     }
     cachedBatchDataArray = new BatchData[readers.size()];
     noMoreDataInQueueArray = new boolean[readers.size()];
+    if (rowLimit != 0) {
+      batchDataLengthList = new int[readers.size()];
+    }
     init();
   }
 
@@ -198,7 +221,13 @@ public class RawQueryDataSetWithoutValueFilter extends 
QueryDataSet
       reader.setHasRemaining(true);
       reader.setManagedByQueryManager(true);
       TASK_POOL_MANAGER.submit(
-          new ReadTask(reader, blockingQueueArray[i], 
paths.get(i).getFullPath()));
+          new ReadTask(
+              reader,
+              blockingQueueArray[i],
+              paths.get(i).getFullPath(),
+              batchDataLengthList,
+              i,
+              rowLimit + rowOffset));
     }
     for (int i = 0; i < seriesReaderList.size(); i++) {
       // check the interrupted status of query before taking next batch
@@ -442,7 +471,12 @@ public class RawQueryDataSetWithoutValueFilter extends 
QueryDataSet
             reader.setManagedByQueryManager(true);
             TASK_POOL_MANAGER.submit(
                 new ReadTask(
-                    reader, blockingQueueArray[seriesIndex], 
paths.get(seriesIndex).getFullPath()));
+                    reader,
+                    blockingQueueArray[seriesIndex],
+                    paths.get(seriesIndex).getFullPath(),
+                    batchDataLengthList,
+                    seriesIndex,
+                    rowLimit + rowOffset));
           }
         }
       }

Reply via email to