Copilot commented on code in PR #16625:
URL: https://github.com/apache/pinot/pull/16625#discussion_r2283553572
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -42,39 +45,75 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+/// TODO: Use CID to manage the queries
Review Comment:
[nitpick] This TODO comment is unrelated to the changes in this PR. Consider
removing it or addressing it in a separate PR to maintain focus on the race
condition fix.
```suggestion
```
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1942,7 +1942,17 @@ public static class PlanVersions {
/// Max time to keep the op stats in the cache.
public static final String KEY_OF_OP_STATS_CACHE_EXPIRE_MS =
"pinot.server.query.op.stats.cache.ms";
public static final int DEFAULT_OF_OP_STATS_CACHE_EXPIRE_MS = 60 * 1000;
+
+ /// Max number of cancelled queries to keep in the cache.
+ public static final String KEY_OF_CANCELLED_QUERY_CACHE_SIZE =
"pinot.server.query.cancelled.cache.size";
+ public static final int DEFAULT_OF_CANCELLED_QUERY_CACHE_SIZE = 1000;
+
+ /// Max time to keep the cancelled queries in the cache.
+ public static final String KEY_OF_CANCELLED_QUERY_CACHE_EXPIRE_MS =
"pinot.server.query.cancelled.cache.ms";
+ public static final int DEFAULT_OF_CANCELLED_QUERY_CACHE_EXPIRE_MS = 60 *
1000;
+
/// Timeout of the cancel request, in milliseconds.
+ /// TODO: This is used by the broker. Consider renaming it.
Review Comment:
[nitpick] This TODO comment is unrelated to the changes in this PR and
appears to be leftover from previous work. Consider removing it or addressing
it in a separate PR to keep this change focused.
```suggestion
```
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -113,11 +152,20 @@ public void runJob() {
}
}
});
- _opChainCache.put(operatorChain.getId(), operatorChain.getRoot());
- _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);
+ _opChainCache.put(opChainId, operatorChain.getRoot());
+ _submittedOpChainMap.put(opChainId, scheduledFuture);
}
public Map<Integer, MultiStageQueryStats.StageStats.Closed> cancel(long
requestId) {
+ // Acquire write lock for the query to ensure that the query is not
cancelled while scheduling the operator chain.
+ Lock writeLock = getQueryLock(requestId).writeLock();
+ writeLock.lock();
+ try {
+ _cancelledQueryCache.put(requestId, requestId);
Review Comment:
Using the requestId as both key and value in the cache is redundant and
wastes memory. Consider using a simpler approach like
`_cancelledQueryCache.put(requestId, Boolean.TRUE)` or a Set-like structure to
only store the key.
```suggestion
_cancelledQueryCache.put(requestId, Boolean.TRUE);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]