songwdfu commented on code in PR #16308:
URL: https://github.com/apache/pinot/pull/16308#discussion_r2265404655


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java:
##########
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SortedRecordTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryErrorMessage;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.trace.Tracing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Combine operator for group-by queries.
+ */
+@SuppressWarnings("rawtypes")
+public class SortedGroupByCombineOperator extends 
BaseSingleBlockCombineOperator<GroupByResultsBlock> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SortedGroupByCombineOperator.class);
+  private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY";
+
+  // We use a CountDownLatch to track if all Futures are finished by the query 
timeout, and cancel the unfinished
+  // _futures (try to interrupt the execution if it already started).
+  private final CountDownLatch _operatorLatch;
+
+  private volatile boolean _groupsTrimmed;
+  private volatile boolean _numGroupsLimitReached;
+  private volatile boolean _numGroupsWarningLimitReached;
+
+  private final AtomicReference<SortedRecordTable> _waitingTable;
+  private final AtomicReference<SortedRecordTable> _satisfiedTable;
+  private final Comparator<Record> _recordKeyComparator;
+
+  public SortedGroupByCombineOperator(List<Operator> operators, QueryContext 
queryContext,
+      ExecutorService executorService) {
+    super(null, operators, overrideMaxExecutionThreads(queryContext, 
operators.size()), executorService);
+
+    assert (GroupByUtils.shouldSortAggregateUnderSafeTrim(queryContext));
+    _operatorLatch = new CountDownLatch(_numTasks);
+    _waitingTable = new AtomicReference<>();
+    _satisfiedTable = new AtomicReference<>();
+    _recordKeyComparator = 
OrderByComparatorFactory.getRecordKeyComparator(queryContext.getOrderByExpressions(),
+        queryContext.getGroupByExpressions(), 
queryContext.isNullHandlingEnabled());
+  }
+
+  /**
+   * For group-by queries, when maxExecutionThreads is not explicitly 
configured, override it to create as many tasks as
+   * the default number of query worker threads (or the number of operators / 
segments if that's lower).
+   */
+  private static QueryContext overrideMaxExecutionThreads(QueryContext 
queryContext, int numOperators) {
+    int maxExecutionThreads = queryContext.getMaxExecutionThreads();
+    if (maxExecutionThreads <= 0) {
+      queryContext.setMaxExecutionThreads(Math.min(numOperators, 
ResourceManager.DEFAULT_QUERY_WORKER_THREADS));
+    }
+    return queryContext;
+  }
+
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  /**
+   * Executes query on one sorted segment in a worker thread and merges the 
results into the sorted record table.
+   */
+  @Override
+  protected void processSegments() {
+    int operatorId;
+    while (_processingException.get() == null && (operatorId = 
_nextOperatorId.getAndIncrement()) < _numOperators) {
+      Operator operator = _operators.get(operatorId);
+      try {
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+        }
+        GroupByResultsBlock resultsBlock = (GroupByResultsBlock) 
operator.nextBlock();
+        if (resultsBlock.isGroupsTrimmed()) {
+          _groupsTrimmed = true;
+        }
+        // Set groups limit reached flag.
+        if (resultsBlock.isNumGroupsLimitReached()) {
+          _numGroupsLimitReached = true;
+        }
+        if (resultsBlock.isNumGroupsWarningLimitReached()) {
+          _numGroupsWarningLimitReached = true;
+        }
+        // short-circuit one segment case
+        if (_numOperators == 1) {
+          _satisfiedTable.set(
+              GroupByUtils.getAndPopulateSortedRecordTable(resultsBlock, 
_queryContext,
+                  _queryContext.getLimit(), _executorService, _numOperators, 
_recordKeyComparator)
+          );
+          break;
+        }
+        // save one call to getAndPopulateLinkedHashMapIndexedTable
+        //  by merging the current block in if there is a waitingTable
+        SortedRecordTable waitingTable = _waitingTable.getAndUpdate(v -> v == 
null

Review Comment:
   > Alternatively, we can also make a class to help sort records (or 
intermediate records), then in the end construct a table with the final records
   
   Just making sure if I understood this correctly, does this mean sth like 
extracting a `SortedRecords` that is used for the entire pair-wise merging
   ```
   class SortedRecords {
     int _numMergedBlocks;
     final int _desiredNumMergedBlocks;
     Record[] _records;
     int _nextIdx;
     int _resultSize;
   }
   ```
   
   and a `SortedRecordMerger` that performs the merge of two `SortedRecords`?
   ```
   public class SortedRecordMerger {
     private final int _resultSize;
     private final Comparator<Record> _comparator;
     private final int _numKeyColumns;
     private final AggregationFunction[] _aggregationFunctions;
   
     /// Merge a SortedRecords into another SortedRecords
     public SortedRecordArray mergeSortedRecordArray(SortedRecordArray left, 
SortedRecordArray right);
   }
   ```
   
   Then in the end we convert the satisfied `SortedRecords` to a 
`SortedRecordTable`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to