Jackie-Jiang commented on code in PR #16308:
URL: https://github.com/apache/pinot/pull/16308#discussion_r2264093177


##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java:
##########
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * Util class used for merging of sorted group-by aggregation
+ */
+public class SortedRecordTable extends BaseTable {
+  private final ExecutorService _executorService;
+  protected final int _resultSize;

Review Comment:
   Any specific reason for the `protected` fields? Do you plan to extend this 
class?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java:
##########
@@ -132,6 +132,11 @@ public class QueryContext {
   private int _numThreadsExtractFinalResult = 
InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT;
   // Parallel chunk size for final reduce
   private int _chunkSizeExtractFinalResult = 
InstancePlanMakerImplV2.DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT;
+  // Threshold to use sort aggregate for safeTrim case when LIMIT is below this
+  private int _sortAggregateLimitThreshold = 
Server.DEFAULT_SORT_AGGREGATE_LIMIT_THRESHOLD;
+  // Threshold of number of segments to combine to use single-threaded 
sequential combine instead pair-wise
+  // This is defaulted to number of available cores
+  private int _sortAggregateSingleThreadedNumSegmentsThreshold = 
Runtime.getRuntime().availableProcessors();

Review Comment:
   Let's cache this value in `CommonConstants` to avoid reading runtime on a 
per query basis



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java:
##########
@@ -58,6 +63,109 @@ public static Comparator<Object[]> 
getComparator(List<OrderByExpressionContext>
     return getComparator(orderByExpressions, nullHandlingEnabled, 0, 
orderByExpressions.size());
   }
 
+  /**
+   * get orderBy expressions on the groupBy keys when orderBy keys match 
groupBy keys

Review Comment:
   (minor, convention) We usually capitalize the javadoc. Same for other places



##########
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java:
##########
@@ -314,7 +317,8 @@ public AggregationFunction[] getAggregationFunctions() {
   }
 
   /**
-   * Returns the filtered aggregation functions for a query, or {@code null} 
if the query does not have any aggregation.
+   * Returns the filtered aggregation functions for a query, or {@code null} 
if the query does not have any
+   * aggregation.

Review Comment:
   (format) Seems the format is not setup correctly. Also please disable the 
auto-format for javadoc and comments, and revert the unnecessary reformat in 
this PR



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java:
##########
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * Util class used for merging of sorted group-by aggregation

Review Comment:
   This is not really a util class



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java:
##########
@@ -34,4 +34,8 @@ public class IntermediateRecord {
     _record = record;
     _values = values;
   }
+
+  static IntermediateRecord createForTest(Key key, Record record, Comparable[] 
values) {

Review Comment:
   Not needed



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java:
##########
@@ -58,6 +63,109 @@ public static Comparator<Object[]> 
getComparator(List<OrderByExpressionContext>
     return getComparator(orderByExpressions, nullHandlingEnabled, 0, 
orderByExpressions.size());
   }
 
+  /**
+   * get orderBy expressions on the groupBy keys when orderBy keys match 
groupBy keys
+   */
+  public static Comparator<Record> 
getRecordKeyComparator(List<OrderByExpressionContext> orderByExpressions,
+      List<ExpressionContext> groupByExpressions, boolean nullHandlingEnabled) 
{
+    List<OrderByExpressionWithIndex> groupKeyOrderByExpressions =
+        
getGroupKeyOrderByExpressionFromRowOrderByExpressions(orderByExpressions, 
groupByExpressions);
+    Comparator<Object[]> valueComparator = 
getComparatorWithIndex(groupKeyOrderByExpressions, nullHandlingEnabled);
+    return (k1, k2) -> valueComparator.compare(k1.getValues(), k2.getValues());
+  }
+
+  private static Map<String, Integer> 
getGroupByExpressionIndexMap(List<ExpressionContext> groupByExpressions) {
+    Map<String, Integer> groupByExpressionIndexMap = new HashMap<>();
+    int numGroupByExpressions = groupByExpressions.size();
+    for (int i = 0; i < numGroupByExpressions; i++) {
+      groupByExpressionIndexMap.put(groupByExpressions.get(i).getIdentifier(), 
i);
+    }
+    return groupByExpressionIndexMap;
+  }
+
+  /**
+   * orderby expression with an index with respect to its position in the 
group keys
+   */
+  public static class OrderByExpressionWithIndex {
+    OrderByExpressionContext _orderByExpressionContext;
+    Integer _index;

Review Comment:
   Should they be `final`?



##########
pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java:
##########
@@ -389,10 +389,11 @@ public void testCovarianceAggregationGroupBy() {
     GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
     
QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(),
 NUM_RECORDS, 0,
         NUM_RECORDS * 2, NUM_RECORDS);
-    AggregationGroupByResult aggregationGroupByResult = 
resultsBlock.getAggregationGroupByResult();
+    // TODO: change all aggregationGroupByResult to intermediateRecord
+    List<IntermediateRecord> aggregationGroupByResult = 
resultsBlock.getIntermediateRecords();

Review Comment:
   (minor) Rename the argument to resultRecords, same for other places



##########
pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java:
##########
@@ -212,4 +217,31 @@ private static IndexedTable 
getTrimEnabledIndexedTable(DataSchema dataSchema, bo
           initialCapacity, executorService);
     }
   }
+
+  /**
+   * do sort aggregate when is safeTrim (order by group keys with no having 
clause)
+   * and limit is smaller than threshold
+   * TODO: we also want to do sort aggregate under order by group key with 
having case,
+   *   in this case we can check if the calculated Server trimSize is < 
sortAggregateLimitThreshold
+   *   if so, we do sort aggregate and trim to trimSize during combine.
+   *   This requires extracting Server trimSize calculation logic into 
QueryContext as pre-req
+   */
+  public static boolean shouldSortAggregateUnderSafeTrim(QueryContext 
queryContext) {

Review Comment:
   We can also move this into `QueryContext`



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java:
##########
@@ -206,6 +210,20 @@ protected GroupByResultsBlock getNextBlock() {
         
resultsBlock.setNumGroupsWarningLimitReached(numGroupsWarningLimitReached);
         return resultsBlock;
       }
+      if (GroupByUtils.shouldSortAggregateUnderSafeTrim(_queryContext)) {

Review Comment:
   (minor) Make the code consistent with `GroupByOperator`. This way it is 
easier to extract common code in the future. We may also add a TODO to extract 
common code



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java:
##########
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SortedRecordTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryErrorMessage;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.trace.Tracing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Combine operator for group-by queries.
+ */
+@SuppressWarnings("rawtypes")
+public class SortedGroupByCombineOperator extends 
BaseSingleBlockCombineOperator<GroupByResultsBlock> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SortedGroupByCombineOperator.class);
+  private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY";
+
+  // We use a CountDownLatch to track if all Futures are finished by the query 
timeout, and cancel the unfinished
+  // _futures (try to interrupt the execution if it already started).
+  private final CountDownLatch _operatorLatch;
+
+  private volatile boolean _groupsTrimmed;
+  private volatile boolean _numGroupsLimitReached;
+  private volatile boolean _numGroupsWarningLimitReached;
+
+  private final AtomicReference<SortedRecordTable> _waitingTable;
+  private final AtomicReference<SortedRecordTable> _satisfiedTable;
+  private final Comparator<Record> _recordKeyComparator;
+
+  public SortedGroupByCombineOperator(List<Operator> operators, QueryContext 
queryContext,
+      ExecutorService executorService) {
+    super(null, operators, overrideMaxExecutionThreads(queryContext, 
operators.size()), executorService);
+
+    assert (GroupByUtils.shouldSortAggregateUnderSafeTrim(queryContext));
+    _operatorLatch = new CountDownLatch(_numTasks);
+    _waitingTable = new AtomicReference<>();
+    _satisfiedTable = new AtomicReference<>();
+    _recordKeyComparator = 
OrderByComparatorFactory.getRecordKeyComparator(queryContext.getOrderByExpressions(),
+        queryContext.getGroupByExpressions(), 
queryContext.isNullHandlingEnabled());
+  }
+
+  /**
+   * For group-by queries, when maxExecutionThreads is not explicitly 
configured, override it to create as many tasks as
+   * the default number of query worker threads (or the number of operators / 
segments if that's lower).
+   */
+  private static QueryContext overrideMaxExecutionThreads(QueryContext 
queryContext, int numOperators) {
+    int maxExecutionThreads = queryContext.getMaxExecutionThreads();
+    if (maxExecutionThreads <= 0) {
+      queryContext.setMaxExecutionThreads(Math.min(numOperators, 
ResourceManager.DEFAULT_QUERY_WORKER_THREADS));
+    }
+    return queryContext;
+  }
+
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  /**
+   * Executes query on one sorted segment in a worker thread and merges the 
results into the sorted record table.
+   */
+  @Override
+  protected void processSegments() {
+    int operatorId;
+    while (_processingException.get() == null && (operatorId = 
_nextOperatorId.getAndIncrement()) < _numOperators) {
+      Operator operator = _operators.get(operatorId);
+      try {
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+        }
+        GroupByResultsBlock resultsBlock = (GroupByResultsBlock) 
operator.nextBlock();
+        if (resultsBlock.isGroupsTrimmed()) {
+          _groupsTrimmed = true;
+        }
+        // Set groups limit reached flag.
+        if (resultsBlock.isNumGroupsLimitReached()) {
+          _numGroupsLimitReached = true;
+        }
+        if (resultsBlock.isNumGroupsWarningLimitReached()) {
+          _numGroupsWarningLimitReached = true;
+        }
+        // short-circuit one segment case
+        if (_numOperators == 1) {
+          _satisfiedTable.set(
+              GroupByUtils.getAndPopulateSortedRecordTable(resultsBlock, 
_queryContext,
+                  _queryContext.getLimit(), _executorService, _numOperators, 
_recordKeyComparator)
+          );
+          break;
+        }
+        // save one call to getAndPopulateLinkedHashMapIndexedTable
+        //  by merging the current block in if there is a waitingTable
+        SortedRecordTable waitingTable = _waitingTable.getAndUpdate(v -> v == 
null

Review Comment:
   Alternatively, we can also make a class to help sort records (or 
intermediate records), then in the end construct a table with the final records



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java:
##########
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SortedRecordTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryErrorMessage;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.trace.Tracing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Combine operator for group-by queries.
+ */
+@SuppressWarnings("rawtypes")
+public class SortedGroupByCombineOperator extends 
BaseSingleBlockCombineOperator<GroupByResultsBlock> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SortedGroupByCombineOperator.class);
+  private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY";
+
+  // We use a CountDownLatch to track if all Futures are finished by the query 
timeout, and cancel the unfinished
+  // _futures (try to interrupt the execution if it already started).
+  private final CountDownLatch _operatorLatch;
+
+  private volatile boolean _groupsTrimmed;
+  private volatile boolean _numGroupsLimitReached;
+  private volatile boolean _numGroupsWarningLimitReached;
+
+  private final AtomicReference<SortedRecordTable> _waitingTable;
+  private final AtomicReference<SortedRecordTable> _satisfiedTable;
+  private final Comparator<Record> _recordKeyComparator;
+
+  public SortedGroupByCombineOperator(List<Operator> operators, QueryContext 
queryContext,
+      ExecutorService executorService) {
+    super(null, operators, overrideMaxExecutionThreads(queryContext, 
operators.size()), executorService);
+
+    assert (GroupByUtils.shouldSortAggregateUnderSafeTrim(queryContext));
+    _operatorLatch = new CountDownLatch(_numTasks);
+    _waitingTable = new AtomicReference<>();
+    _satisfiedTable = new AtomicReference<>();
+    _recordKeyComparator = 
OrderByComparatorFactory.getRecordKeyComparator(queryContext.getOrderByExpressions(),
+        queryContext.getGroupByExpressions(), 
queryContext.isNullHandlingEnabled());
+  }
+
+  /**
+   * For group-by queries, when maxExecutionThreads is not explicitly 
configured, override it to create as many tasks as
+   * the default number of query worker threads (or the number of operators / 
segments if that's lower).
+   */
+  private static QueryContext overrideMaxExecutionThreads(QueryContext 
queryContext, int numOperators) {
+    int maxExecutionThreads = queryContext.getMaxExecutionThreads();
+    if (maxExecutionThreads <= 0) {
+      queryContext.setMaxExecutionThreads(Math.min(numOperators, 
ResourceManager.DEFAULT_QUERY_WORKER_THREADS));
+    }
+    return queryContext;
+  }
+
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  /**
+   * Executes query on one sorted segment in a worker thread and merges the 
results into the sorted record table.
+   */
+  @Override
+  protected void processSegments() {
+    int operatorId;
+    while (_processingException.get() == null && (operatorId = 
_nextOperatorId.getAndIncrement()) < _numOperators) {
+      Operator operator = _operators.get(operatorId);
+      try {
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+        }
+        GroupByResultsBlock resultsBlock = (GroupByResultsBlock) 
operator.nextBlock();
+        if (resultsBlock.isGroupsTrimmed()) {
+          _groupsTrimmed = true;
+        }
+        // Set groups limit reached flag.
+        if (resultsBlock.isNumGroupsLimitReached()) {
+          _numGroupsLimitReached = true;
+        }
+        if (resultsBlock.isNumGroupsWarningLimitReached()) {
+          _numGroupsWarningLimitReached = true;
+        }
+        // short-circuit one segment case
+        if (_numOperators == 1) {
+          _satisfiedTable.set(
+              GroupByUtils.getAndPopulateSortedRecordTable(resultsBlock, 
_queryContext,
+                  _queryContext.getLimit(), _executorService, _numOperators, 
_recordKeyComparator)
+          );
+          break;
+        }
+        // save one call to getAndPopulateLinkedHashMapIndexedTable
+        //  by merging the current block in if there is a waitingTable
+        SortedRecordTable waitingTable = _waitingTable.getAndUpdate(v -> v == 
null

Review Comment:
   (MAJOR) We should not do expensive operations here as the same operation 
might be applied multiple times. The conversion from `GroupByResultsBlock` to 
`SortedRecordTable` should happen in a single thread context



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java:
##########
@@ -123,7 +123,7 @@ protected void updateExistingRecord(Key key, Record 
newRecord) {
     _lookupMap.computeIfPresent(key, (k, v) -> updateRecord(v, newRecord));
   }
 
-  private Record updateRecord(Record existingRecord, Record newRecord) {
+  protected Record updateRecord(Record existingRecord, Record newRecord) {

Review Comment:
   (minor) Seems not needed



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java:
##########
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * Util class used for merging of sorted group-by aggregation
+ */
+public class SortedRecordTable extends BaseTable {
+  private final ExecutorService _executorService;
+  protected final int _resultSize;
+  protected final int _numKeyColumns;
+  protected final AggregationFunction[] _aggregationFunctions;
+
+  protected Record[] _topRecords;
+
+  private Record[] _records;
+  private int _numMergedBlocks;
+  private final int _desiredNumMergedBlocks;
+  private int _nextIdx;
+  private final Comparator<Record> _comparator;
+  protected final int _numThreadsExtractFinalResult;
+  protected final int _chunkSizeExtractFinalResult;
+
+  public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, 
int resultSize,

Review Comment:
   (optional, can be a followup) Seems quite some code are duplicated in this 
class and `IndexedTable`. We can consider extracting the common part into 
`BaseTable`



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java:
##########
@@ -225,13 +225,16 @@ public void 
testMSQEGroupsTrimmedAtSegmentLevelWithOrderByOnAggregateIsNotSafe()
     ResultSetGroup result = conn.execute(options + query);
     assertTrimFlagSet(result);
 
-    assertEquals(toResultStr(result),
-        "\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n"
-            + "77,\t377,\t4\n"
-            + "66,\t566,\t4\n"
-            + "39,\t339,\t4\n"
-            + "96,\t396,\t4\n"
-            + "25,\t25,\t4");
+    String[] lines = toResultStr(result).split("\n");

Review Comment:
   Why do we need to change this? Is this query affected?



##########
pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java:
##########
@@ -212,4 +217,31 @@ private static IndexedTable 
getTrimEnabledIndexedTable(DataSchema dataSchema, bo
           initialCapacity, executorService);
     }
   }
+
+  /**
+   * do sort aggregate when is safeTrim (order by group keys with no having 
clause)
+   * and limit is smaller than threshold
+   * TODO: we also want to do sort aggregate under order by group key with 
having case,
+   *   in this case we can check if the calculated Server trimSize is < 
sortAggregateLimitThreshold
+   *   if so, we do sort aggregate and trim to trimSize during combine.
+   *   This requires extracting Server trimSize calculation logic into 
QueryContext as pre-req
+   */
+  public static boolean shouldSortAggregateUnderSafeTrim(QueryContext 
queryContext) {
+    return !queryContext.isUnsafeTrim() && queryContext.getLimit() < 
queryContext.getSortAggregateLimitThreshold();
+  }
+
+  public static SortedRecordTable 
getAndPopulateSortedRecordTable(GroupByResultsBlock block,
+      QueryContext queryContext, int resultSize,
+      ExecutorService executorService, int desiredNumMergedBlocks, 
Comparator<Record> recordKeyComaparator) {

Review Comment:
   (typo)
   ```suggestion
         ExecutorService executorService, int desiredNumMergedBlocks, 
Comparator<Record> recordKeyComparator) {
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java:
##########
@@ -58,6 +63,109 @@ public static Comparator<Object[]> 
getComparator(List<OrderByExpressionContext>
     return getComparator(orderByExpressions, nullHandlingEnabled, 0, 
orderByExpressions.size());
   }
 
+  /**
+   * get orderBy expressions on the groupBy keys when orderBy keys match 
groupBy keys
+   */
+  public static Comparator<Record> 
getRecordKeyComparator(List<OrderByExpressionContext> orderByExpressions,
+      List<ExpressionContext> groupByExpressions, boolean nullHandlingEnabled) 
{
+    List<OrderByExpressionWithIndex> groupKeyOrderByExpressions =
+        
getGroupKeyOrderByExpressionFromRowOrderByExpressions(orderByExpressions, 
groupByExpressions);
+    Comparator<Object[]> valueComparator = 
getComparatorWithIndex(groupKeyOrderByExpressions, nullHandlingEnabled);
+    return (k1, k2) -> valueComparator.compare(k1.getValues(), k2.getValues());
+  }
+
+  private static Map<String, Integer> 
getGroupByExpressionIndexMap(List<ExpressionContext> groupByExpressions) {
+    Map<String, Integer> groupByExpressionIndexMap = new HashMap<>();
+    int numGroupByExpressions = groupByExpressions.size();
+    for (int i = 0; i < numGroupByExpressions; i++) {
+      groupByExpressionIndexMap.put(groupByExpressions.get(i).getIdentifier(), 
i);
+    }
+    return groupByExpressionIndexMap;
+  }
+
+  /**
+   * orderby expression with an index with respect to its position in the 
group keys
+   */
+  public static class OrderByExpressionWithIndex {

Review Comment:
   Should this be public accessible or just a inner helper class?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java:
##########
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * Util class used for merging of sorted group-by aggregation
+ */
+public class SortedRecordTable extends BaseTable {

Review Comment:
   Annotate it to `@SuppressWarnings({"rawtypes", "unchecked"})`. You may also 
follow similar checks (e.g. `assert`) in `IndexedTable` to address warnings 
from IDE



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java:
##########
@@ -58,6 +63,109 @@ public static Comparator<Object[]> 
getComparator(List<OrderByExpressionContext>
     return getComparator(orderByExpressions, nullHandlingEnabled, 0, 
orderByExpressions.size());
   }
 
+  /**
+   * get orderBy expressions on the groupBy keys when orderBy keys match 
groupBy keys
+   */
+  public static Comparator<Record> 
getRecordKeyComparator(List<OrderByExpressionContext> orderByExpressions,
+      List<ExpressionContext> groupByExpressions, boolean nullHandlingEnabled) 
{
+    List<OrderByExpressionWithIndex> groupKeyOrderByExpressions =
+        
getGroupKeyOrderByExpressionFromRowOrderByExpressions(orderByExpressions, 
groupByExpressions);
+    Comparator<Object[]> valueComparator = 
getComparatorWithIndex(groupKeyOrderByExpressions, nullHandlingEnabled);
+    return (k1, k2) -> valueComparator.compare(k1.getValues(), k2.getValues());
+  }
+
+  private static Map<String, Integer> 
getGroupByExpressionIndexMap(List<ExpressionContext> groupByExpressions) {
+    Map<String, Integer> groupByExpressionIndexMap = new HashMap<>();
+    int numGroupByExpressions = groupByExpressions.size();
+    for (int i = 0; i < numGroupByExpressions; i++) {
+      groupByExpressionIndexMap.put(groupByExpressions.get(i).getIdentifier(), 
i);
+    }
+    return groupByExpressionIndexMap;
+  }
+
+  /**
+   * orderby expression with an index with respect to its position in the 
group keys
+   */
+  public static class OrderByExpressionWithIndex {
+    OrderByExpressionContext _orderByExpressionContext;
+    Integer _index;

Review Comment:
   Should `_index` be primitive type? I don't think we can tolerate `null` here



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java:
##########
@@ -225,13 +225,16 @@ public void 
testMSQEGroupsTrimmedAtSegmentLevelWithOrderByOnAggregateIsNotSafe()
     ResultSetGroup result = conn.execute(options + query);
     assertTrimFlagSet(result);
 
-    assertEquals(toResultStr(result),
-        "\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n"
-            + "77,\t377,\t4\n"
-            + "66,\t566,\t4\n"
-            + "39,\t339,\t4\n"
-            + "96,\t396,\t4\n"
-            + "25,\t25,\t4");
+    String[] lines = toResultStr(result).split("\n");
+
+    // Assert the header exactly
+    assertEquals(lines[0], 
"\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]");
+    // Assert col3 of all data rows is 4
+    for (int i = 1; i < lines.length; i++) {
+      String[] cols = lines[i].split("\t");
+      assertEquals("4", cols[2]);

Review Comment:
   First argument should be `actual`



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java:
##########
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SortedRecordTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryErrorMessage;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.trace.Tracing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Combine operator for group-by queries.

Review Comment:
   Please add some javadoc about the algorithm



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java:
##########
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SortedRecordTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryErrorMessage;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.trace.Tracing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Combine operator for group-by queries.
+ */
+@SuppressWarnings("rawtypes")
+public class SortedGroupByCombineOperator extends 
BaseSingleBlockCombineOperator<GroupByResultsBlock> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SortedGroupByCombineOperator.class);
+  private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY";
+
+  // We use a CountDownLatch to track if all Futures are finished by the query 
timeout, and cancel the unfinished
+  // _futures (try to interrupt the execution if it already started).
+  private final CountDownLatch _operatorLatch;
+
+  private volatile boolean _groupsTrimmed;
+  private volatile boolean _numGroupsLimitReached;
+  private volatile boolean _numGroupsWarningLimitReached;
+
+  private final AtomicReference<SortedRecordTable> _waitingTable;
+  private final AtomicReference<SortedRecordTable> _satisfiedTable;
+  private final Comparator<Record> _recordKeyComparator;
+
+  public SortedGroupByCombineOperator(List<Operator> operators, QueryContext 
queryContext,
+      ExecutorService executorService) {
+    super(null, operators, overrideMaxExecutionThreads(queryContext, 
operators.size()), executorService);
+
+    assert (GroupByUtils.shouldSortAggregateUnderSafeTrim(queryContext));
+    _operatorLatch = new CountDownLatch(_numTasks);
+    _waitingTable = new AtomicReference<>();
+    _satisfiedTable = new AtomicReference<>();
+    _recordKeyComparator = 
OrderByComparatorFactory.getRecordKeyComparator(queryContext.getOrderByExpressions(),
+        queryContext.getGroupByExpressions(), 
queryContext.isNullHandlingEnabled());
+  }
+
+  /**
+   * For group-by queries, when maxExecutionThreads is not explicitly 
configured, override it to create as many tasks as
+   * the default number of query worker threads (or the number of operators / 
segments if that's lower).
+   */
+  private static QueryContext overrideMaxExecutionThreads(QueryContext 
queryContext, int numOperators) {
+    int maxExecutionThreads = queryContext.getMaxExecutionThreads();
+    if (maxExecutionThreads <= 0) {
+      queryContext.setMaxExecutionThreads(Math.min(numOperators, 
ResourceManager.DEFAULT_QUERY_WORKER_THREADS));
+    }
+    return queryContext;
+  }
+
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  /**
+   * Executes query on one sorted segment in a worker thread and merges the 
results into the sorted record table.
+   */
+  @Override
+  protected void processSegments() {
+    int operatorId;
+    while (_processingException.get() == null && (operatorId = 
_nextOperatorId.getAndIncrement()) < _numOperators) {
+      Operator operator = _operators.get(operatorId);
+      try {
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+        }
+        GroupByResultsBlock resultsBlock = (GroupByResultsBlock) 
operator.nextBlock();
+        if (resultsBlock.isGroupsTrimmed()) {
+          _groupsTrimmed = true;
+        }
+        // Set groups limit reached flag.
+        if (resultsBlock.isNumGroupsLimitReached()) {
+          _numGroupsLimitReached = true;
+        }
+        if (resultsBlock.isNumGroupsWarningLimitReached()) {
+          _numGroupsWarningLimitReached = true;
+        }
+        // short-circuit one segment case
+        if (_numOperators == 1) {

Review Comment:
   We probably will never hit this given we have sequential combine. Let's make 
sure single operator case is handled by sequential combine



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java:
##########
@@ -347,7 +368,16 @@ public List<IntermediateRecord> 
trimInSegmentResults(GroupKeyGenerator groupKeyG
       }
     }
 
-    return Arrays.asList(heap);
+    for (int i = heap.length; i > 0; i--) {

Review Comment:
   (MAJOR) This can cause overhead for other scenarios where sort is not 
required. Let's add a boolean to control whether to sort the result



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to