xiangfu0 commented on code in PR #14698:
URL: https://github.com/apache/pinot/pull/14698#discussion_r3000126483


##########
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java:
##########
@@ -257,6 +262,23 @@ private void applyQueryOptions(QueryContext queryContext) {
       } else {
         
queryContext.setMinInitialIndexedTableCapacity(_minInitialIndexedTableCapacity);
       }
+      String groupByAlgorithm =
+          
QueryOptionsUtils.normalizeGroupByAlgorithm(QueryOptionsUtils.getGroupByAlgorithm(queryOptions));
+      if (StringUtils.isNotEmpty(groupByAlgorithm)) {
+        queryContext.setGroupByAlgorithm(groupByAlgorithm);
+      } else {
+        groupByAlgorithm = DEFAULT_GROUP_BY_ALGORITHM;
+        queryContext.setGroupByAlgorithm(DEFAULT_GROUP_BY_ALGORITHM);
+      }
+      Integer numGroupByPartitions = 
QueryOptionsUtils.getNumGroupByPartitions(queryOptions);
+      if (numGroupByPartitions != null) {
+        queryContext.setNumGroupByPartitions(numGroupByPartitions);
+      } else if 
(PartitionedGroupByCombineOperator.ALGORITHM.equals(groupByAlgorithm)) {
+        
queryContext.setNumGroupByPartitions(Math.max(DEFAULT_NUM_GROUP_BY_PARTITIONS,
+            queryContext.getMaxExecutionThreads()));
+      } else {

Review Comment:
   Fixed in 3dbe195191. The partition default now derives from effective query 
parallelism: when maxExecutionThreads is auto/non-positive, it falls back to 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY before choosing the 
partition count. I also added a regression in CombinePlanNodeTest to cover the 
auto-thread case.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java:
##########
@@ -263,9 +263,29 @@ public int size() {
 
   @Override
   public Iterator<Record> iterator() {
+    if (_topRecords == null) {
+      return _lookupMap.values().iterator();
+    }
     return _topRecords.iterator();
   }
 
+
+  public void mergePartitionTable(Table table) {
+    if (table instanceof IndexedTable) {
+      _lookupMap.putAll(((IndexedTable) table)._lookupMap);
+    } else {
+      Iterator<Record> iterator = table.iterator();
+      while (iterator.hasNext()) {
+        // NOTE: For performance concern, does not check the return value of 
the upsert(). Override this method if
+        //       upsert() can return false.
+        upsert(iterator.next());
+      }
+    }
+    if (_lookupMap.size() >= _trimThreshold) {
+      resize();
+    }
+  }

Review Comment:
   Fixed in 3dbe195191. mergePartitionTable() now invalidates the cached 
_topRecords view before mutating the backing lookup map, so post-finish merges 
cannot serve stale ORDER BY results. IndexedTableTest now covers both the 
invalidation path and a re-finish that picks the new top record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to