This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch QueryBlocking in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0fd2015fab8c5a9cba95b0700bdb4d9071358c8e Author: JackieTien97 <[email protected]> AuthorDate: Tue Jan 4 16:14:18 2022 +0800 fix query dead lock --- .../dataset/RawQueryDataSetWithoutValueFilter.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java index d739d10..21d612e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java @@ -94,12 +94,20 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet } blockingQueue.put(batchData); + // has limit clause if (batchDataLengthList != null) { batchDataLengthList[seriesIndex] += batchData.length(); if (batchDataLengthList[seriesIndex] >= fetchLimit) { - break; + // the queue has enough space to hold SignalBatchData, just break the while loop + if (blockingQueue.remainingCapacity() > 0) { + break; + } else { // otherwise, exit without putting SignalBatchData, main thread will submit a new task again, then it will put SignalBatchData successfully + reader.setManagedByQueryManager(false); + return; + } } } + // if the queue also has free space, just submit another itself if (blockingQueue.remainingCapacity() > 0) { TASK_POOL_MANAGER.submit(this); @@ -171,6 +179,9 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet private final long queryId; + // this field record the original value of offset clause, won't change during the query execution + protected final int originalRowOffset; + private static final RawQueryReadTaskPoolManager TASK_POOL_MANAGER = RawQueryReadTaskPoolManager.getInstance(); @@ -190,6 +201,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet queryPlan.getDeduplicatedDataTypes(), queryPlan.isAscending()); this.rowLimit = queryPlan.getRowLimit(); + this.originalRowOffset = queryPlan.getRowOffset(); this.rowOffset = queryPlan.getRowOffset(); this.withoutAnyNull = queryPlan.isWithoutAnyNull(); this.withoutAllNull = queryPlan.isWithoutAllNull(); @@ -215,6 +227,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet */ public RawQueryDataSetWithoutValueFilter(long queryId) { this.queryId = queryId; + this.originalRowOffset = 0; blockingQueueArray = new BlockingQueue[0]; timeHeap = new TimeSelector(0, ascending); } @@ -246,7 +259,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet paths.get(seriesIndex).getFullPath(), batchDataLengthList, seriesIndex, - rowLimit + rowOffset); + rowLimit + originalRowOffset); } /**
