thinkharderdev commented on a change in pull request #1842:
URL: https://github.com/apache/arrow-datafusion/pull/1842#discussion_r809912822



##########
File path: ballista/rust/scheduler/src/planner.rs
##########
@@ -55,24 +56,41 @@ impl Default for DistributedPlanner {
 
 impl DistributedPlanner {
     /// Returns a vector of ExecutionPlans, where the root node is a 
[ShuffleWriterExec].
-    /// Plans that depend on the input of other plans will have leaf nodes of 
type [UnresolvedShuffleExec].
+    /// Plans that depend on the input of other plans will have leaf nodes of 
type [UnresolvedShuffleExec]
+    /// or of type [ShuffleStreamReaderExec] if the created stages are 
all-at-once stages.
     /// A [ShuffleWriterExec] is created whenever the partitioning changes.
     pub async fn plan_query_stages<'a>(
         &'a mut self,
         job_id: &'a str,
         execution_plan: Arc<dyn ExecutionPlan>,
     ) -> Result<Vec<Arc<ShuffleWriterExec>>> {
-        info!("planning query stages");
-        let (new_plan, mut stages) = self
-            .plan_query_stages_internal(job_id, execution_plan)
+        info!("planning query stages for job {}", job_id);
+        let (modified_plan, mut stages) = self
+            .plan_query_stages_internal(job_id, execution_plan.clone())
             .await?;
-        stages.push(create_shuffle_writer(
-            job_id,
-            self.next_stage_id(),
-            new_plan,
-            None,
-        )?);
-        Ok(stages)
+        // re-plan the input execution plan and create All-at-once query 
stages.
+        // Now we just simply depends on the the stage count to decide whether 
to create All-at-once or normal stages.
+        // In future, we can have more sophisticated way to decide which way 
to go.
+        if stages.len() > 1 && stages.len() <= 4 {

Review comment:
       If I understand the original design correctly, the "all-at-once" plan 
will only get scheduled when there are sufficient task slots available to run 
the entire plan. So should this be a function of the total number of 
partitions? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to