Copilot commented on code in PR #14698:
URL: https://github.com/apache/pinot/pull/14698#discussion_r2999403391
##########
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java:
##########
@@ -257,6 +262,23 @@ private void applyQueryOptions(QueryContext queryContext) {
} else {
queryContext.setMinInitialIndexedTableCapacity(_minInitialIndexedTableCapacity);
}
+ String groupByAlgorithm =
+
QueryOptionsUtils.normalizeGroupByAlgorithm(QueryOptionsUtils.getGroupByAlgorithm(queryOptions));
+ if (StringUtils.isNotEmpty(groupByAlgorithm)) {
+ queryContext.setGroupByAlgorithm(groupByAlgorithm);
+ } else {
+ groupByAlgorithm = DEFAULT_GROUP_BY_ALGORITHM;
+ queryContext.setGroupByAlgorithm(DEFAULT_GROUP_BY_ALGORITHM);
+ }
+ Integer numGroupByPartitions =
QueryOptionsUtils.getNumGroupByPartitions(queryOptions);
+ if (numGroupByPartitions != null) {
+ queryContext.setNumGroupByPartitions(numGroupByPartitions);
+ } else if
(PartitionedGroupByCombineOperator.ALGORITHM.equals(groupByAlgorithm)) {
+
queryContext.setNumGroupByPartitions(Math.max(DEFAULT_NUM_GROUP_BY_PARTITIONS,
+ queryContext.getMaxExecutionThreads()));
+ } else {
Review Comment:
When `groupByAlgorithm` is `PARTITIONED` and `numGroupByPartitions` is not
explicitly set, this defaults partitions to `max(1,
queryContext.getMaxExecutionThreads())`. In common configurations
`maxExecutionThreads` is `-1` (meaning “auto”), so this resolves to 1
partition, which defeats the intent (and contradicts the PR description that
the server picks a partition count based on query parallelism). Consider
deriving the effective parallelism (e.g., treat non-positive
maxExecutionThreads as `QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY` or
similar) before choosing the default partition count.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java:
##########
@@ -263,9 +263,29 @@ public int size() {
@Override
public Iterator<Record> iterator() {
+ if (_topRecords == null) {
+ return _lookupMap.values().iterator();
+ }
return _topRecords.iterator();
}
+
+ public void mergePartitionTable(Table table) {
+ if (table instanceof IndexedTable) {
+ _lookupMap.putAll(((IndexedTable) table)._lookupMap);
+ } else {
+ Iterator<Record> iterator = table.iterator();
+ while (iterator.hasNext()) {
+ // NOTE: For performance concern, does not check the return value of
the upsert(). Override this method if
+ // upsert() can return false.
+ upsert(iterator.next());
+ }
+ }
+ if (_lookupMap.size() >= _trimThreshold) {
+ resize();
+ }
+ }
Review Comment:
`mergePartitionTable()` mutates `_lookupMap` but does not
invalidate/recompute `_topRecords`. If `finish()` has already been called for
an ORDER BY table, `_topRecords` is a detached collection from `_lookupMap`, so
subsequent iteration/size() can ignore newly merged records. Consider either
enforcing a precondition that mergePartitionTable is only called before
`finish()` (e.g., fail fast when `_topRecords != null`), or clearing
`_topRecords` so future iteration reflects merged state (and re-running
`finish()` if needed).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]