This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6e4fd1285a Remove the unused streaming combine operator (#14718)
6e4fd1285a is described below
commit 6e4fd1285ad5c3d0f394c56dd5ed9665e4c62e72
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Dec 25 18:08:29 2024 -0800
Remove the unused streaming combine operator (#14718)
---
.../StreamingAggregationCombineOperator.java | 45 ----
.../StreamingDistinctCombineOperator.java | 45 ----
.../streaming/StreamingGroupByCombineOperator.java | 238 ---------------------
.../StreamingSelectionOrderByCombineOperator.java | 45 ----
4 files changed, 373 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingAggregationCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingAggregationCombineOperator.java
deleted file mode 100644
index ff5820611d..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingAggregationCombineOperator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.streaming;
-
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
-import
org.apache.pinot.core.operator.combine.merger.AggregationResultsBlockMerger;
-import org.apache.pinot.core.query.request.context.QueryContext;
-
-
-/**
- * Combine operator for aggregation queries with streaming response.
- */
-@SuppressWarnings("rawtypes")
-public class StreamingAggregationCombineOperator extends
BaseStreamingCombineOperator<AggregationResultsBlock> {
- private static final String EXPLAIN_NAME = "STREAMING_COMBINE_AGGREGATE";
-
- public StreamingAggregationCombineOperator(List<Operator> operators,
QueryContext queryContext,
- ExecutorService executorService) {
- super(new AggregationResultsBlockMerger(queryContext), operators,
queryContext, executorService);
- }
-
- @Override
- public String toExplainString() {
- return EXPLAIN_NAME;
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingDistinctCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingDistinctCombineOperator.java
deleted file mode 100644
index 6834e30145..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingDistinctCombineOperator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.streaming;
-
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
-import
org.apache.pinot.core.operator.combine.merger.DistinctResultsBlockMerger;
-import org.apache.pinot.core.query.request.context.QueryContext;
-
-
-/**
- * Combine operator for distinct queries with streaming response.
- */
-@SuppressWarnings("rawtypes")
-public class StreamingDistinctCombineOperator extends
BaseStreamingCombineOperator<DistinctResultsBlock> {
- private static final String EXPLAIN_NAME = "STREAMING_COMBINE_DISTINCT";
-
- public StreamingDistinctCombineOperator(List<Operator> operators,
QueryContext queryContext,
- ExecutorService executorService) {
- super(new DistinctResultsBlockMerger(queryContext), operators,
queryContext, executorService);
- }
-
- @Override
- public String toExplainString() {
- return EXPLAIN_NAME;
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
deleted file mode 100644
index 13b06ae6f4..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.streaming;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.data.table.IndexedTable;
-import org.apache.pinot.core.data.table.IntermediateRecord;
-import org.apache.pinot.core.data.table.Key;
-import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
-import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
-import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
-import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
-import org.apache.pinot.core.operator.blocks.results.MetadataResultsBlock;
-import org.apache.pinot.core.operator.combine.CombineOperatorUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
-import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
-import org.apache.pinot.core.util.GroupByUtils;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
-import org.apache.pinot.spi.trace.Tracing;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Combine operator for group-by queries.
- * TODO: Use CombineOperatorUtils.getNumThreadsForQuery() to get the
parallelism of the query instead of using
- * all threads
- */
-@SuppressWarnings("rawtypes")
-public class StreamingGroupByCombineOperator extends
BaseStreamingCombineOperator<GroupByResultsBlock> {
- public static final int MAX_TRIM_THRESHOLD = 1_000_000_000;
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(StreamingGroupByCombineOperator.class);
- private static final String EXPLAIN_NAME = "STREAMING_COMBINE_GROUP_BY";
-
- private final int _numAggregationFunctions;
- private final int _numGroupByExpressions;
- private final int _numColumns;
- // We use a CountDownLatch to track if all Futures are finished by the query
timeout, and cancel the unfinished
- // _futures (try to interrupt the execution if it already started).
- private final CountDownLatch _operatorLatch;
- private boolean _opCompleted;
-
- private volatile IndexedTable _indexedTable;
- private volatile boolean _numGroupsLimitReached;
-
- public StreamingGroupByCombineOperator(List<Operator> operators,
QueryContext queryContext,
- ExecutorService executorService) {
- super(null, operators, overrideMaxExecutionThreads(queryContext,
operators.size()), executorService);
-
- AggregationFunction[] aggregationFunctions =
_queryContext.getAggregationFunctions();
- assert aggregationFunctions != null;
- _numAggregationFunctions = aggregationFunctions.length;
- assert _queryContext.getGroupByExpressions() != null;
- _numGroupByExpressions = _queryContext.getGroupByExpressions().size();
- _numColumns = _numGroupByExpressions + _numAggregationFunctions;
- _operatorLatch = new CountDownLatch(_numTasks);
- _opCompleted = false;
- }
-
- @Override
- protected BaseResultsBlock getNextBlock() {
- if (!_opCompleted) {
- try {
- return getFinalResult();
- } catch (InterruptedException e) {
- throw new EarlyTerminationException("Interrupted while merging results
blocks", e);
- } catch (Exception e) {
- LOGGER.error("Caught exception while merging results blocks (query:
{})", _queryContext, e);
- return new
ExceptionResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR,
e));
- }
- }
- // Setting the execution stats for the final return
- BaseResultsBlock finalBlock = new MetadataResultsBlock();
- int numServerThreads = Math.min(_numTasks,
ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
- CombineOperatorUtils.setExecutionStatistics(finalBlock, _operators,
_totalWorkerThreadCpuTimeNs.get(),
- numServerThreads);
- return finalBlock;
- }
-
- /**
- * For group-by queries, when maxExecutionThreads is not explicitly
configured, create one task per operator.
- */
- private static QueryContext overrideMaxExecutionThreads(QueryContext
queryContext, int numOperators) {
- int maxExecutionThreads = queryContext.getMaxExecutionThreads();
- if (maxExecutionThreads <= 0) {
- queryContext.setMaxExecutionThreads(numOperators);
- }
- return queryContext;
- }
-
- @Override
- public String toExplainString() {
- return EXPLAIN_NAME;
- }
-
- /**
- * Executes query on one segment in a worker thread and merges the results
into the indexed table.
- */
- @Override
- public void processSegments() {
- int operatorId;
- while (_processingException.get() == null && (operatorId =
_nextOperatorId.getAndIncrement()) < _numOperators) {
- Operator operator = _operators.get(operatorId);
- try {
- if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
- ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
- }
- GroupByResultsBlock resultsBlock = (GroupByResultsBlock)
operator.nextBlock();
- if (_indexedTable == null) {
- synchronized (this) {
- if (_indexedTable == null) {
- _indexedTable =
GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext,
_numTasks);
- }
- }
- }
-
- // Set groups limit reached flag.
- if (resultsBlock.isNumGroupsLimitReached()) {
- _numGroupsLimitReached = true;
- }
-
- // Merge aggregation group-by result.
- // Iterate over the group-by keys, for each key, update the group-by
result in the indexedTable
- Collection<IntermediateRecord> intermediateRecords =
resultsBlock.getIntermediateRecords();
- // Count the number of merged keys
- int mergedKeys = 0;
- // For now, only GroupBy OrderBy query has pre-constructed
intermediate records
- if (intermediateRecords == null) {
- // Merge aggregation group-by result.
- AggregationGroupByResult aggregationGroupByResult =
resultsBlock.getAggregationGroupByResult();
- if (aggregationGroupByResult != null) {
- // Iterate over the group-by keys, for each key, update the
group-by result in the indexedTable
- Iterator<GroupKeyGenerator.GroupKey> dicGroupKeyIterator =
aggregationGroupByResult.getGroupKeyIterator();
- while (dicGroupKeyIterator.hasNext()) {
- GroupKeyGenerator.GroupKey groupKey = dicGroupKeyIterator.next();
- Object[] keys = groupKey._keys;
- Object[] values = Arrays.copyOf(keys, _numColumns);
- int groupId = groupKey._groupId;
- for (int i = 0; i < _numAggregationFunctions; i++) {
- values[_numGroupByExpressions + i] =
aggregationGroupByResult.getResultForGroupId(i, groupId);
- }
- _indexedTable.upsert(new Key(keys), new Record(values));
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(mergedKeys);
- mergedKeys++;
- }
- }
- } else {
- for (IntermediateRecord intermediateResult : intermediateRecords) {
- //TODO: change upsert api so that it accepts intermediateRecord
directly
- _indexedTable.upsert(intermediateResult._key,
intermediateResult._record);
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(mergedKeys);
- mergedKeys++;
- }
- }
- } catch (RuntimeException e) {
- throw wrapOperatorException(operator, e);
- } finally {
- if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
- ((AcquireReleaseColumnsSegmentOperator) operator).release();
- }
- }
- }
- }
-
- // TODO: combine this with the single block group by combine operator
- private BaseResultsBlock getFinalResult()
- throws InterruptedException {
- long timeoutMs = _queryContext.getEndTimeMs() - System.currentTimeMillis();
- _opCompleted = _operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
- if (!_opCompleted) {
- // If this happens, the broker side should already timed out, just log
the error and return
- String errorMessage =
- String.format("Timed out while combining group-by order-by results
after %dms, queryContext = %s", timeoutMs,
- _queryContext);
- LOGGER.error(errorMessage);
- return new ExceptionResultsBlock(new TimeoutException(errorMessage));
- }
-
- Throwable processingException = _processingException.get();
- if (processingException != null) {
- return new ExceptionResultsBlock(processingException);
- }
-
- IndexedTable indexedTable = _indexedTable;
- if (_queryContext.isServerReturnFinalResult()) {
- indexedTable.finish(true, true);
- } else if (_queryContext.isServerReturnFinalResultKeyUnpartitioned()) {
- indexedTable.finish(false, true);
- } else {
- indexedTable.finish(false);
- }
- GroupByResultsBlock mergedBlock = new GroupByResultsBlock(indexedTable,
_queryContext);
- mergedBlock.setNumGroupsLimitReached(_numGroupsLimitReached);
- mergedBlock.setNumResizes(indexedTable.getNumResizes());
- mergedBlock.setResizeTimeMs(indexedTable.getResizeTimeMs());
- return mergedBlock;
- }
-
- @Override
- public void onProcessSegmentsException(Throwable t) {
- _processingException.compareAndSet(null, t);
- }
-
- @Override
- public void onProcessSegmentsFinish() {
- _operatorLatch.countDown();
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOrderByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOrderByCombineOperator.java
deleted file mode 100644
index 064b2ebf3a..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOrderByCombineOperator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.streaming;
-
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
-import
org.apache.pinot.core.operator.combine.merger.SelectionOrderByResultsBlockMerger;
-import org.apache.pinot.core.query.request.context.QueryContext;
-
-
-/**
- * Combine operator for selection queries with order-by, with streaming
response.
- */
-@SuppressWarnings("rawtypes")
-public class StreamingSelectionOrderByCombineOperator extends
BaseStreamingCombineOperator<SelectionResultsBlock> {
- private static final String EXPLAIN_NAME =
"STREAMING_COMBINE_SELECT_ORDERBY";
-
- public StreamingSelectionOrderByCombineOperator(List<Operator> operators,
QueryContext queryContext,
- ExecutorService executorService) {
- super(new SelectionOrderByResultsBlockMerger(queryContext), operators,
queryContext, executorService);
- }
-
- @Override
- public String toExplainString() {
- return EXPLAIN_NAME;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]