Jackie-Jiang commented on a change in pull request #6043:
URL: https://github.com/apache/incubator-pinot/pull/6043#discussion_r504232412
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
##########
@@ -277,4 +253,112 @@ public DataTable processQuery(ServerQueryRequest
queryRequest, ExecutorService e
LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId,
dataTable);
return dataTable;
}
+
+ private DataTable processQuery(List<IndexSegment> indexSegments,
QueryContext queryContext, TimerContext timerContext,
+ ExecutorService executorService, @Nullable
StreamObserver<Server.ServerResponse> responseObserver, long endTimeMs,
+ boolean enableStreaming)
+ throws Exception {
+ handleSubquery(queryContext, indexSegments, timerContext, executorService,
endTimeMs);
+
+ // Compute total docs for the table before pruning the segments
+ long numTotalDocs = 0;
+ for (IndexSegment indexSegment : indexSegments) {
+ numTotalDocs += indexSegment.getSegmentMetadata().getTotalDocs();
+ }
+
+ TimerContext.Timer segmentPruneTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
+ List<IndexSegment> selectedSegments =
_segmentPrunerService.prune(indexSegments, queryContext);
+ segmentPruneTimer.stopAndRecord();
+ int numSelectedSegments = selectedSegments.size();
+ LOGGER.debug("Matched {} segments after pruning", numSelectedSegments);
+ if (numSelectedSegments == 0) {
+ // Only return metadata for streaming query
+ DataTable dataTable = enableStreaming ? new DataTableImplV2() :
DataTableUtils.buildEmptyDataTable(queryContext);
+ Map<String, String> metadata = dataTable.getMetadata();
+ metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY,
String.valueOf(numTotalDocs));
+ metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
+ metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, "0");
+ metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY,
"0");
+ metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0");
+ metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
+ return dataTable;
+ } else {
+ TimerContext.Timer planBuildTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
+ Plan queryPlan = enableStreaming ? _planMaker
+ .makeStreamingInstancePlan(selectedSegments, queryContext,
executorService, responseObserver, endTimeMs)
+ : _planMaker.makeInstancePlan(selectedSegments, queryContext,
executorService, endTimeMs);
+ planBuildTimer.stopAndRecord();
+
+ TimerContext.Timer planExecTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
+ DataTable dataTable = queryPlan.execute();
+ planExecTimer.stopAndRecord();
+
+ // Update the total docs in the metadata based on the un-pruned segments
+ dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY,
Long.toString(numTotalDocs));
+
+ return dataTable;
+ }
+ }
+
+ /**
+ * Handles the subquery in the given query.
+ * <p>Currently only supports subquery within IN_PARTITIONED_SUBQUERY
function.
+ */
+ private void handleSubquery(QueryContext queryContext, List<IndexSegment>
indexSegments, TimerContext timerContext,
+ ExecutorService executorService, long endTimeMs)
+ throws Exception {
+ FilterContext filter = queryContext.getFilter();
+ if (filter != null) {
+ handleSubquery(filter, indexSegments, timerContext, executorService,
endTimeMs);
+ }
+ }
+
+ /**
+ * Handles the subquery in the filter.
+ * <p>Currently only supports subquery within IN_PARTITIONED_SUBQUERY
function.
+ */
+ private void handleSubquery(FilterContext filter, List<IndexSegment>
indexSegments, TimerContext timerContext,
+ ExecutorService executorService, long endTimeMs)
+ throws Exception {
+ Predicate predicate = filter.getPredicate();
+ if (predicate != null) {
+ handleSubquery(predicate.getLhs(), indexSegments, timerContext,
executorService, endTimeMs);
+ } else {
+ for (FilterContext child : filter.getChildren()) {
+ handleSubquery(child, indexSegments, timerContext, executorService,
endTimeMs);
+ }
+ }
+ }
+
+ /**
+ * Handles the subquery in the given expression.
+ * <p>Currently only supports subquery within IN_PARTITIONED_SUBQUERY
function.
+ */
+ private void handleSubquery(ExpressionContext expression, List<IndexSegment>
indexSegments, TimerContext timerContext,
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]