This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch pushdownlimit in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 649cb4e98e1d1c3cae73d1d3b1daf31fc9020046 Author: Alima777 <[email protected]> AuthorDate: Tue Dec 7 16:59:16 2021 +0800 push down limit to read task in rawdatasetwithoutValueFilter --- .../dataset/RawQueryDataSetWithoutValueFilter.java | 32 ++++++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java index 5f83c74..78be9d2 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java @@ -58,12 +58,24 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet private final ManagedSeriesReader reader; private final String pathName; private final BlockingQueue<BatchData> blockingQueue; + // If rowLimit/rowOffset exists, fetchLimit = rowLimit + rowOffset, Otherwise Integer.Max + // It is only used when the readTask is initialized first time, to avoid read too much batchData + // If fetchLimit is still not satisfied when blockingQueue is filled, we don't use it again when + // the readTask is initialized again in fillCache() because it's also controlled in queryDataSet + // and fetchLimit is not easy to calculated when batchData is returned but not merged among + // series + private int fetchLimit; + private int batchDataTotalLength = 0; public ReadTask( - ManagedSeriesReader reader, BlockingQueue<BatchData> blockingQueue, String pathName) { + ManagedSeriesReader reader, + BlockingQueue<BatchData> blockingQueue, + String pathName, + int fetchLimit) { this.reader = reader; this.blockingQueue = blockingQueue; this.pathName = pathName; + this.fetchLimit = fetchLimit; } @Override @@ -87,8 +99,10 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet continue; } blockingQueue.put(batchData); - // if the queue also has free space, just submit another itself - if (blockingQueue.remainingCapacity() > 0) { + // if the queue also has free space and the size of batchData < fetchLimit, just submit + // another itself + batchDataTotalLength += batchData.length(); + if (batchDataTotalLength < fetchLimit && blockingQueue.remainingCapacity() > 0) { TASK_POOL_MANAGER.submit(this); } // the queue has no more space @@ -214,7 +228,8 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet reader.setHasRemaining(true); reader.setManagedByQueryManager(true); TASK_POOL_MANAGER.submit( - new ReadTask(reader, blockingQueueArray[i], paths.get(i).getFullPath())); + new ReadTask( + reader, blockingQueueArray[i], paths.get(i).getFullPath(), rowLimit + rowOffset)); } for (int i = 0; i < seriesReaderList.size(); i++) { // check the interrupted status of query before taking next batch @@ -538,7 +553,10 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet reader.setManagedByQueryManager(true); TASK_POOL_MANAGER.submit( new ReadTask( - reader, blockingQueueArray[seriesIndex], paths.get(seriesIndex).getFullPath())); + reader, + blockingQueueArray[seriesIndex], + paths.get(seriesIndex).getFullPath(), + Integer.MAX_VALUE)); } } } @@ -593,7 +611,9 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet } else { record.addField(cachedBatchDataArray[seriesIndex].currentValue(), dataType); } - cacheNext(seriesIndex); + if (alreadyReturnedRowNum < rowOffset + rowLimit) { + cacheNext(seriesIndex); + } } }
