This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch acquire_release in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 0f3ff1918fafad93149406d4f61f1d21ee959fd8 Author: Neha Pawar <[email protected]> AuthorDate: Mon Sep 20 20:16:17 2021 -0700 Acquire-release --- .../AcquireReleaseColumnsSegmentOperator.java | 25 +++++++++++++--------- .../core/operator/combine/BaseCombineOperator.java | 11 +++++++++- .../combine/GroupByOrderByCombineOperator.java | 17 ++++++++++----- .../plan/AcquireReleaseColumnsSegmentPlanNode.java | 2 +- 4 files changed, 38 insertions(+), 17 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java index 127f38f..04dc79b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java @@ -20,6 +20,7 @@ package org.apache.pinot.core.operator; import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.plan.PlanNode; import org.apache.pinot.segment.spi.FetchContext; import org.apache.pinot.segment.spi.IndexSegment; @@ -31,13 +32,13 @@ import org.apache.pinot.segment.spi.IndexSegment; public class AcquireReleaseColumnsSegmentOperator extends BaseOperator { private static final String OPERATOR_NAME = "AcquireReleaseColumnsSegmentOperator"; - private final Operator _childOperator; + private final PlanNode _planNode; private final IndexSegment _indexSegment; private final FetchContext _fetchContext; + private Operator _childOperator; - public AcquireReleaseColumnsSegmentOperator(Operator childOperator, IndexSegment indexSegment, - FetchContext fetchContext) { - _childOperator = childOperator; + public AcquireReleaseColumnsSegmentOperator(PlanNode planNode, IndexSegment indexSegment, FetchContext fetchContext) { + _planNode = planNode; _indexSegment = indexSegment; _fetchContext = fetchContext; } @@ -49,12 +50,16 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator { */ @Override protected Block getNextBlock() { + _childOperator = _planNode.run(); + return _childOperator.nextBlock(); + } + + public void acquire() { _indexSegment.acquire(_fetchContext); - try { - return _childOperator.nextBlock(); - } finally { - _indexSegment.release(_fetchContext); - } + } + + public void release() { + _indexSegment.release(_fetchContext); } @Override @@ -64,6 +69,6 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator { @Override public ExecutionStatistics getExecutionStatistics() { - return _childOperator.getExecutionStatistics(); + return _childOperator == null ? new ExecutionStatistics(0, 0, 0, 0) : _childOperator.getExecutionStatistics(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index eeb27c2..89c6f1f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.query.request.context.QueryContext; @@ -146,8 +147,12 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul */ protected void processSegments(int taskIndex) { for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) { + Operator operator = _operators.get(operatorIndex); try { - IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock(); + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); + } + IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) operator.nextBlock(); if (isQuerySatisfied(resultsBlock)) { // Query is satisfied, skip processing the remaining segments _blockingQueue.offer(resultsBlock); @@ -164,6 +169,10 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul e); _blockingQueue.offer(new IntermediateResultsBlock(e)); return; + } finally { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).release(); + } } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index b2d9373..d6c5dac 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -41,6 +41,7 @@ import org.apache.pinot.core.data.table.IntermediateRecord; import org.apache.pinot.core.data.table.Key; import org.apache.pinot.core.data.table.Record; import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; @@ -125,9 +126,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator { */ @Override protected void processSegments(int threadIndex) { + Operator operator = _operators.get(threadIndex); try { - IntermediateResultsBlock intermediateResultsBlock = - (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock(); + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); + } + IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) operator.nextBlock(); _initLock.lock(); try { @@ -186,9 +190,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator { // Early-terminated because query times out or is already satisfied } catch (Exception e) { LOGGER.error("Caught exception while processing and combining group-by order-by for index: {}, operator: {}, " - + "queryContext: {}", threadIndex, _operators.get(threadIndex).getClass().getName(), _queryContext, e); + + "queryContext: {}", threadIndex, operator.getClass().getName(), _queryContext, e); _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); } finally { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).release(); + } _operatorLatch.countDown(); } } @@ -213,8 +220,8 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator { boolean opCompleted = _operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS); if (!opCompleted) { // If this happens, the broker side should already timed out, just log the error and return - String errorMessage = - String.format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs, + String errorMessage = String + .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs, _queryContext); LOGGER.error(errorMessage); return new IntermediateResultsBlock(new TimeoutException(errorMessage)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java index 5a9f506..5e517e1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java @@ -41,6 +41,6 @@ public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode { @Override public AcquireReleaseColumnsSegmentOperator run() { - return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), _indexSegment, _fetchContext); + return new AcquireReleaseColumnsSegmentOperator(_childPlanNode, _indexSegment, _fetchContext); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
