This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch 0.12pushdownlimit in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ec578a5a9c1bd5bb3e55a6f58e7ef9ffc592d529 Author: Alima777 <[email protected]> AuthorDate: Wed Dec 8 09:56:35 2021 +0800 push down limit to read task in rawdatasetwithoutValueFilter --- .../dataset/RawQueryDataSetWithoutValueFilter.java | 40 ++++++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java index 7640b08..a8d7e79 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java @@ -56,12 +56,23 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet private final ManagedSeriesReader reader; private final String pathName; private final BlockingQueue<BatchData> blockingQueue; + private int[] batchDataLengthList; + private final int seriesIndex; + private final int fetchLimit; public ReadTask( - ManagedSeriesReader reader, BlockingQueue<BatchData> blockingQueue, String pathName) { + ManagedSeriesReader reader, + BlockingQueue<BatchData> blockingQueue, + String pathName, + int[] batchDataLengthList, + int seriesIndex, + int fetchLimit) { this.reader = reader; this.blockingQueue = blockingQueue; this.pathName = pathName; + this.batchDataLengthList = batchDataLengthList; + this.seriesIndex = seriesIndex; + this.fetchLimit = fetchLimit; } @Override @@ -81,6 +92,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet continue; } blockingQueue.put(batchData); + + if (batchDataLengthList != null) { + batchDataLengthList[seriesIndex] += batchData.length(); + if (batchDataLengthList[seriesIndex] >= fetchLimit) { + break; + } + } // if the queue also has free space, just submit another itself if (blockingQueue.remainingCapacity() > 0) { TASK_POOL_MANAGER.submit(this); @@ -144,6 +162,8 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet protected BatchData[] cachedBatchDataArray; + protected int[] batchDataLengthList; + // capacity for blocking queue private static final int BLOCKING_QUEUE_CAPACITY = 5; @@ -177,6 +197,9 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet } cachedBatchDataArray = new BatchData[readers.size()]; noMoreDataInQueueArray = new boolean[readers.size()]; + if (rowLimit != 0) { + batchDataLengthList = new int[readers.size()]; + } init(); } @@ -198,7 +221,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet reader.setHasRemaining(true); reader.setManagedByQueryManager(true); TASK_POOL_MANAGER.submit( - new ReadTask(reader, blockingQueueArray[i], paths.get(i).getFullPath())); + new ReadTask( + reader, + blockingQueueArray[i], + paths.get(i).getFullPath(), + batchDataLengthList, + i, + rowLimit + rowOffset)); } for (int i = 0; i < seriesReaderList.size(); i++) { // check the interrupted status of query before taking next batch @@ -442,7 +471,12 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet reader.setManagedByQueryManager(true); TASK_POOL_MANAGER.submit( new ReadTask( - reader, blockingQueueArray[seriesIndex], paths.get(seriesIndex).getFullPath())); + reader, + blockingQueueArray[seriesIndex], + paths.get(seriesIndex).getFullPath(), + batchDataLengthList, + seriesIndex, + rowLimit + rowOffset)); } } }
