This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b65497ddb7 allow to prefetch for V2 engine as done for V1 engine
(#14589)
b65497ddb7 is described below
commit b65497ddb7f14138e50fabaad6fcfda25b09624f
Author: Xiaobing <[email protected]>
AuthorDate: Tue Dec 3 20:47:18 2024 -0800
allow to prefetch for V2 engine as done for V1 engine (#14589)
---
.../core/plan/maker/InstancePlanMakerImplV2.java | 26 +++++++++++++++++-----
1 file changed, 21 insertions(+), 5 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index bd02391112..6912dd2586 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -256,14 +256,30 @@ public class InstancePlanMakerImplV2 implements PlanMaker
{
public Plan makeStreamingInstancePlan(List<SegmentContext> segmentContexts,
QueryContext queryContext,
ExecutorService executorService, ResultsBlockStreamer streamer,
ServerMetrics serverMetrics) {
applyQueryOptions(queryContext);
- List<PlanNode> planNodes = new ArrayList<>(segmentContexts.size());
- for (SegmentContext segmentContext : segmentContexts) {
- planNodes.add(makeStreamingSegmentPlanNode(segmentContext,
queryContext));
+
+ int numSegments = segmentContexts.size();
+ List<PlanNode> planNodes = new ArrayList<>(numSegments);
+ List<FetchContext> fetchContexts;
+ if (queryContext.isEnablePrefetch()) {
+ fetchContexts = new ArrayList<>(numSegments);
+ for (SegmentContext segmentContext : segmentContexts) {
+ FetchContext fetchContext =
+
_fetchPlanner.planFetchForProcessing(segmentContext.getIndexSegment(),
queryContext);
+ fetchContexts.add(fetchContext);
+ planNodes.add(
+ new
AcquireReleaseColumnsSegmentPlanNode(makeStreamingSegmentPlanNode(segmentContext,
queryContext),
+ segmentContext, fetchContext));
+ }
+ } else {
+ fetchContexts = Collections.emptyList();
+ for (SegmentContext segmentContext : segmentContexts) {
+ planNodes.add(makeStreamingSegmentPlanNode(segmentContext,
queryContext));
+ }
}
+
CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes,
queryContext, executorService, streamer);
return new GlobalPlanImplV0(
- new StreamingInstanceResponsePlanNode(combinePlanNode,
segmentContexts, Collections.emptyList(), queryContext,
- streamer));
+ new StreamingInstanceResponsePlanNode(combinePlanNode,
segmentContexts, fetchContexts, queryContext, streamer));
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]