kishoreg commented on a change in pull request #6043:
URL: https://github.com/apache/incubator-pinot/pull/6043#discussion_r502756962
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
##########
@@ -277,4 +253,112 @@ public DataTable processQuery(ServerQueryRequest
queryRequest, ExecutorService e
LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId,
dataTable);
return dataTable;
}
+
+ private DataTable processQuery(List<IndexSegment> indexSegments,
QueryContext queryContext, TimerContext timerContext,
+ ExecutorService executorService, @Nullable
StreamObserver<Server.ServerResponse> responseObserver, long endTimeMs,
+ boolean enableStreaming)
+ throws Exception {
+ handleSubquery(queryContext, indexSegments, timerContext, executorService,
endTimeMs);
+
+ // Compute total docs for the table before pruning the segments
+ long numTotalDocs = 0;
+ for (IndexSegment indexSegment : indexSegments) {
+ numTotalDocs += indexSegment.getSegmentMetadata().getTotalDocs();
+ }
+
+ TimerContext.Timer segmentPruneTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
+ List<IndexSegment> selectedSegments =
_segmentPrunerService.prune(indexSegments, queryContext);
+ segmentPruneTimer.stopAndRecord();
+ int numSelectedSegments = selectedSegments.size();
+ LOGGER.debug("Matched {} segments after pruning", numSelectedSegments);
+ if (numSelectedSegments == 0) {
+ // Only return metadata for streaming query
+ DataTable dataTable = enableStreaming ? new DataTableImplV2() :
DataTableUtils.buildEmptyDataTable(queryContext);
+ Map<String, String> metadata = dataTable.getMetadata();
+ metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY,
String.valueOf(numTotalDocs));
+ metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
+ metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, "0");
+ metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY,
"0");
+ metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0");
+ metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
+ return dataTable;
+ } else {
+ TimerContext.Timer planBuildTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
+ Plan queryPlan = enableStreaming ? _planMaker
+ .makeStreamingInstancePlan(selectedSegments, queryContext,
executorService, responseObserver, endTimeMs)
+ : _planMaker.makeInstancePlan(selectedSegments, queryContext,
executorService, endTimeMs);
+ planBuildTimer.stopAndRecord();
+
+ TimerContext.Timer planExecTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
+ DataTable dataTable = queryPlan.execute();
+ planExecTimer.stopAndRecord();
+
+ // Update the total docs in the metadata based on the un-pruned segments
+ dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY,
Long.toString(numTotalDocs));
+
+ return dataTable;
+ }
+ }
+
+ /**
+ * Handles the subquery in the given query.
+ * <p>Currently only supports subquery within IN_PARTITIONED_SUBQUERY
function.
+ */
+ private void handleSubquery(QueryContext queryContext, List<IndexSegment>
indexSegments, TimerContext timerContext,
+ ExecutorService executorService, long endTimeMs)
+ throws Exception {
+ FilterContext filter = queryContext.getFilter();
+ if (filter != null) {
+ handleSubquery(filter, indexSegments, timerContext, executorService,
endTimeMs);
+ }
+ }
+
+ /**
+ * Handles the subquery in the filter.
+ * <p>Currently only supports subquery within IN_PARTITIONED_SUBQUERY
function.
+ */
+ private void handleSubquery(FilterContext filter, List<IndexSegment>
indexSegments, TimerContext timerContext,
+ ExecutorService executorService, long endTimeMs)
+ throws Exception {
+ Predicate predicate = filter.getPredicate();
+ if (predicate != null) {
Review comment:
docs, isn't it better to check for filter.getChildren and invert the if
statement?
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
##########
@@ -277,4 +253,112 @@ public DataTable processQuery(ServerQueryRequest
queryRequest, ExecutorService e
LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId,
dataTable);
return dataTable;
}
+
+ private DataTable processQuery(List<IndexSegment> indexSegments,
QueryContext queryContext, TimerContext timerContext,
+ ExecutorService executorService, @Nullable
StreamObserver<Server.ServerResponse> responseObserver, long endTimeMs,
+ boolean enableStreaming)
+ throws Exception {
+ handleSubquery(queryContext, indexSegments, timerContext, executorService,
endTimeMs);
+
+ // Compute total docs for the table before pruning the segments
+ long numTotalDocs = 0;
+ for (IndexSegment indexSegment : indexSegments) {
+ numTotalDocs += indexSegment.getSegmentMetadata().getTotalDocs();
+ }
+
+ TimerContext.Timer segmentPruneTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
+ List<IndexSegment> selectedSegments =
_segmentPrunerService.prune(indexSegments, queryContext);
+ segmentPruneTimer.stopAndRecord();
+ int numSelectedSegments = selectedSegments.size();
+ LOGGER.debug("Matched {} segments after pruning", numSelectedSegments);
+ if (numSelectedSegments == 0) {
+ // Only return metadata for streaming query
+ DataTable dataTable = enableStreaming ? new DataTableImplV2() :
DataTableUtils.buildEmptyDataTable(queryContext);
+ Map<String, String> metadata = dataTable.getMetadata();
+ metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY,
String.valueOf(numTotalDocs));
+ metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
+ metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, "0");
+ metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY,
"0");
+ metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0");
+ metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
+ return dataTable;
+ } else {
+ TimerContext.Timer planBuildTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
+ Plan queryPlan = enableStreaming ? _planMaker
+ .makeStreamingInstancePlan(selectedSegments, queryContext,
executorService, responseObserver, endTimeMs)
+ : _planMaker.makeInstancePlan(selectedSegments, queryContext,
executorService, endTimeMs);
+ planBuildTimer.stopAndRecord();
+
+ TimerContext.Timer planExecTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
+ DataTable dataTable = queryPlan.execute();
+ planExecTimer.stopAndRecord();
+
+ // Update the total docs in the metadata based on the un-pruned segments
+ dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY,
Long.toString(numTotalDocs));
+
+ return dataTable;
+ }
+ }
+
+ /**
+ * Handles the subquery in the given query.
+ * <p>Currently only supports subquery within IN_PARTITIONED_SUBQUERY
function.
+ */
+ private void handleSubquery(QueryContext queryContext, List<IndexSegment>
indexSegments, TimerContext timerContext,
+ ExecutorService executorService, long endTimeMs)
+ throws Exception {
+ FilterContext filter = queryContext.getFilter();
+ if (filter != null) {
+ handleSubquery(filter, indexSegments, timerContext, executorService,
endTimeMs);
+ }
+ }
+
+ /**
+ * Handles the subquery in the filter.
+ * <p>Currently only supports subquery within IN_PARTITIONED_SUBQUERY
function.
+ */
+ private void handleSubquery(FilterContext filter, List<IndexSegment>
indexSegments, TimerContext timerContext,
+ ExecutorService executorService, long endTimeMs)
+ throws Exception {
+ Predicate predicate = filter.getPredicate();
+ if (predicate != null) {
+ handleSubquery(predicate.getLhs(), indexSegments, timerContext,
executorService, endTimeMs);
+ } else {
+ for (FilterContext child : filter.getChildren()) {
+ handleSubquery(child, indexSegments, timerContext, executorService,
endTimeMs);
+ }
+ }
+ }
+
+ /**
+ * Handles the subquery in the given expression.
+ * <p>Currently only supports subquery within IN_PARTITIONED_SUBQUERY
function.
+ */
+ private void handleSubquery(ExpressionContext expression, List<IndexSegment>
indexSegments, TimerContext timerContext,
Review comment:
add java docs on what this method is doing
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
##########
@@ -277,4 +253,112 @@ public DataTable processQuery(ServerQueryRequest
queryRequest, ExecutorService e
LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId,
dataTable);
return dataTable;
}
+
+ private DataTable processQuery(List<IndexSegment> indexSegments,
QueryContext queryContext, TimerContext timerContext,
+ ExecutorService executorService, @Nullable
StreamObserver<Server.ServerResponse> responseObserver, long endTimeMs,
+ boolean enableStreaming)
+ throws Exception {
+ handleSubquery(queryContext, indexSegments, timerContext, executorService,
endTimeMs);
+
+ // Compute total docs for the table before pruning the segments
+ long numTotalDocs = 0;
+ for (IndexSegment indexSegment : indexSegments) {
+ numTotalDocs += indexSegment.getSegmentMetadata().getTotalDocs();
+ }
+
+ TimerContext.Timer segmentPruneTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
+ List<IndexSegment> selectedSegments =
_segmentPrunerService.prune(indexSegments, queryContext);
+ segmentPruneTimer.stopAndRecord();
+ int numSelectedSegments = selectedSegments.size();
+ LOGGER.debug("Matched {} segments after pruning", numSelectedSegments);
+ if (numSelectedSegments == 0) {
+ // Only return metadata for streaming query
+ DataTable dataTable = enableStreaming ? new DataTableImplV2() :
DataTableUtils.buildEmptyDataTable(queryContext);
+ Map<String, String> metadata = dataTable.getMetadata();
+ metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY,
String.valueOf(numTotalDocs));
+ metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
+ metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, "0");
+ metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY,
"0");
+ metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0");
+ metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
+ return dataTable;
+ } else {
+ TimerContext.Timer planBuildTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
+ Plan queryPlan = enableStreaming ? _planMaker
+ .makeStreamingInstancePlan(selectedSegments, queryContext,
executorService, responseObserver, endTimeMs)
+ : _planMaker.makeInstancePlan(selectedSegments, queryContext,
executorService, endTimeMs);
+ planBuildTimer.stopAndRecord();
+
+ TimerContext.Timer planExecTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
+ DataTable dataTable = queryPlan.execute();
+ planExecTimer.stopAndRecord();
+
+ // Update the total docs in the metadata based on the un-pruned segments
+ dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY,
Long.toString(numTotalDocs));
+
+ return dataTable;
+ }
+ }
+
+ /**
+ * Handles the subquery in the given query.
+ * <p>Currently only supports subquery within IN_PARTITIONED_SUBQUERY
function.
+ */
+ private void handleSubquery(QueryContext queryContext, List<IndexSegment>
indexSegments, TimerContext timerContext,
+ ExecutorService executorService, long endTimeMs)
+ throws Exception {
+ FilterContext filter = queryContext.getFilter();
+ if (filter != null) {
+ handleSubquery(filter, indexSegments, timerContext, executorService,
endTimeMs);
+ }
+ }
+
+ /**
+ * Handles the subquery in the filter.
+ * <p>Currently only supports subquery within IN_PARTITIONED_SUBQUERY
function.
+ */
+ private void handleSubquery(FilterContext filter, List<IndexSegment>
indexSegments, TimerContext timerContext,
+ ExecutorService executorService, long endTimeMs)
+ throws Exception {
+ Predicate predicate = filter.getPredicate();
+ if (predicate != null) {
+ handleSubquery(predicate.getLhs(), indexSegments, timerContext,
executorService, endTimeMs);
+ } else {
+ for (FilterContext child : filter.getChildren()) {
+ handleSubquery(child, indexSegments, timerContext, executorService,
endTimeMs);
+ }
+ }
+ }
+
+ /**
+ * Handles the subquery in the given expression.
+ * <p>Currently only supports subquery within IN_PARTITIONED_SUBQUERY
function.
+ */
+ private void handleSubquery(ExpressionContext expression, List<IndexSegment>
indexSegments, TimerContext timerContext,
+ ExecutorService executorService, long endTimeMs)
+ throws Exception {
+ FunctionContext function = expression.getFunction();
+ if (function == null) {
+ return;
+ }
+ List<ExpressionContext> arguments = function.getArguments();
+ if (StringUtils.remove(function.getFunctionName(),
'_').equalsIgnoreCase(IN_PARTITIONED_SUBQUERY)) {
+ Preconditions
+ .checkState(arguments.size() == 2, "IN_PARTITIONED_SUBQUERY requires
2 arguments: expression, subquery");
+ ExpressionContext subqueryExpression = arguments.get(1);
+ Preconditions.checkState(subqueryExpression.getType() ==
ExpressionContext.Type.LITERAL,
+ "Second argument of IN_PARTITIONED_SUBQUERY must be a literal
(subquery)");
+ QueryContext subquery =
QueryContextConverterUtils.getQueryContextFromSQL(subqueryExpression.getLiteral());
+ DataTable dataTable =
Review comment:
more javadocs
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -372,5 +399,46 @@ private static void getAggregations(FilterContext filter,
List<FunctionContext>
getAggregations(filter.getPredicate().getLhs(), aggregations);
}
}
+
+ /**
+ * Helper method to extract the columns (IDENTIFIER expressions) for the
query.
+ */
+ public void extractColumns(QueryContext query) {
Review comment:
why is this a public method? who is invoking this and from where
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]