This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b65497ddb7 allow to prefetch for V2 engine as done for V1 engine 
(#14589)
b65497ddb7 is described below

commit b65497ddb7f14138e50fabaad6fcfda25b09624f
Author: Xiaobing <[email protected]>
AuthorDate: Tue Dec 3 20:47:18 2024 -0800

    allow to prefetch for V2 engine as done for V1 engine (#14589)
---
 .../core/plan/maker/InstancePlanMakerImplV2.java   | 26 +++++++++++++++++-----
 1 file changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index bd02391112..6912dd2586 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -256,14 +256,30 @@ public class InstancePlanMakerImplV2 implements PlanMaker 
{
   public Plan makeStreamingInstancePlan(List<SegmentContext> segmentContexts, 
QueryContext queryContext,
       ExecutorService executorService, ResultsBlockStreamer streamer, 
ServerMetrics serverMetrics) {
     applyQueryOptions(queryContext);
-    List<PlanNode> planNodes = new ArrayList<>(segmentContexts.size());
-    for (SegmentContext segmentContext : segmentContexts) {
-      planNodes.add(makeStreamingSegmentPlanNode(segmentContext, 
queryContext));
+
+    int numSegments = segmentContexts.size();
+    List<PlanNode> planNodes = new ArrayList<>(numSegments);
+    List<FetchContext> fetchContexts;
+    if (queryContext.isEnablePrefetch()) {
+      fetchContexts = new ArrayList<>(numSegments);
+      for (SegmentContext segmentContext : segmentContexts) {
+        FetchContext fetchContext =
+            
_fetchPlanner.planFetchForProcessing(segmentContext.getIndexSegment(), 
queryContext);
+        fetchContexts.add(fetchContext);
+        planNodes.add(
+            new 
AcquireReleaseColumnsSegmentPlanNode(makeStreamingSegmentPlanNode(segmentContext,
 queryContext),
+                segmentContext, fetchContext));
+      }
+    } else {
+      fetchContexts = Collections.emptyList();
+      for (SegmentContext segmentContext : segmentContexts) {
+        planNodes.add(makeStreamingSegmentPlanNode(segmentContext, 
queryContext));
+      }
     }
+
     CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, 
queryContext, executorService, streamer);
     return new GlobalPlanImplV0(
-        new StreamingInstanceResponsePlanNode(combinePlanNode, 
segmentContexts, Collections.emptyList(), queryContext,
-            streamer));
+        new StreamingInstanceResponsePlanNode(combinePlanNode, 
segmentContexts, fetchContexts, queryContext, streamer));
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to