gortiz commented on code in PR #13733:
URL: https://github.com/apache/pinot/pull/13733#discussion_r1747205731
##########
pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java:
##########
@@ -691,4 +662,132 @@ private void addPrunerStats(InstanceResponseBlock
instanceResponse, SegmentPrune
instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(),
String.valueOf(prunerStats.getValuePruned()));
}
+
+ private List<IndexSegment> selectSegments(List<IndexSegment> indexSegments,
QueryContext queryContext,
+ TimerContext timerContext, ExecutorService executorService,
SegmentPrunerStatistics prunerStats) {
+ List<IndexSegment> selectedSegments;
+ if ((queryContext.getFilter() != null &&
queryContext.getFilter().isConstantFalse()) || (
+ queryContext.getHavingFilter() != null &&
queryContext.getHavingFilter().isConstantFalse())) {
+ selectedSegments = Collections.emptyList();
+ } else {
+ TimerContext.Timer segmentPruneTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
+ selectedSegments = _segmentPrunerService.prune(indexSegments,
queryContext, prunerStats, executorService);
+ segmentPruneTimer.stopAndRecord();
+ }
+ return selectedSegments;
+ }
+
+ private Plan planCombineQuery(QueryContext queryContext, TimerContext
timerContext, ExecutorService executorService,
+ @Nullable ResultsBlockStreamer streamer, List<SegmentContext>
selectedSegmentContexts) {
+ TimerContext.Timer planBuildTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
+
+ Plan queryPlan;
+ if (streamer != null) {
+ queryPlan =
_planMaker.makeStreamingInstancePlan(selectedSegmentContexts, queryContext,
executorService,
+ streamer, _serverMetrics);
+ } else {
+ queryPlan = _planMaker.makeInstancePlan(selectedSegmentContexts,
queryContext, executorService, _serverMetrics);
+ }
+ planBuildTimer.stopAndRecord();
+ return queryPlan;
+ }
+
+ private InstanceResponseBlock execute(TableDataManager tableDataManager,
+ List<IndexSegment> indexSegments, QueryContext queryContext,
TimerContext timerContext,
+ ExecutorService executorService, ResultsBlockStreamer streamer, boolean
enableStreaming,
+ List<IndexSegment> selectedSegments, List<SegmentContext>
selectedSegmentContexts)
+ throws TimeoutException {
+ InstanceResponseBlock instanceResponse;
+ @Nullable
+ ResultsBlockStreamer actualStreamer = enableStreaming ? streamer : null;
+ switch (queryContext.getExplain()) {
+ case DESCRIPTION:
+ instanceResponse = executeDescribeExplain(indexSegments, queryContext,
timerContext, executorService,
+ actualStreamer, selectedSegmentContexts);
+ break;
+ case NODE:
+ instanceResponse = executeNodeExplain(queryContext, timerContext,
executorService, actualStreamer,
+ selectedSegmentContexts);
+ break;
+ case NONE:
+ instanceResponse = executeQuery(queryContext, timerContext,
executorService, actualStreamer,
+ selectedSegmentContexts);
+ break;
+ default:
+ throw new IllegalStateException("Unsupported explain mode: " +
queryContext.getExplain());
+ }
+ return instanceResponse;
+ }
+
+ private InstanceResponseBlock executeQuery(QueryContext queryContext,
TimerContext timerContext,
+ ExecutorService executorService, @Nullable ResultsBlockStreamer streamer,
+ List<SegmentContext> selectedSegmentContexts)
+ throws TimeoutException {
+ if (selectedSegmentContexts.isEmpty()) {
+ return new
InstanceResponseBlock(ResultsBlockUtils.buildEmptyQueryResults(queryContext));
+ }
+ Plan queryPlan = planCombineQuery(queryContext, timerContext,
executorService, streamer, selectedSegmentContexts);
+
+ TimerContext.Timer planExecTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
+ InstanceResponseBlock instanceResponse = queryPlan.execute();
+ planExecTimer.stopAndRecord();
+ return instanceResponse;
+ }
+
+ private InstanceResponseBlock executeNodeExplain(QueryContext queryContext,
TimerContext timerContext,
+ ExecutorService executorService, @Nullable ResultsBlockStreamer streamer,
+ List<SegmentContext> selectedSegmentContexts) {
+
+ if (selectedSegmentContexts.isEmpty()) {
+ ExplainInfo emptyNode = new
ExplainInfo(ExplainPlanRows.ALL_SEGMENTS_PRUNED_ON_SERVER);
+ ExplainV2ResultBlock explainResults = new
ExplainV2ResultBlock(queryContext, emptyNode);
+
+ return new InstanceResponseBlock(explainResults);
+ }
+
+ Plan queryPlan = planCombineQuery(queryContext, timerContext,
executorService, streamer,
+ selectedSegmentContexts);
+
+ TimerContext.Timer planExecTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
+
+ InstanceResponseBlock instanceResponse;
+ InstanceResponseOperator responseOperator = (InstanceResponseOperator)
queryPlan.getPlanNode().run();
+ try {
+ responseOperator.prefetchAll();
+
+ ExplainInfo explainInfo = responseOperator.getExplainInfo();
+ ExplainV2ResultBlock block = new ExplainV2ResultBlock(queryContext,
explainInfo);
+
+ instanceResponse = new InstanceResponseBlock(block);
+ // TODO: Study if this metadata can/should be added
Review Comment:
Do not forget about this TODO
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]