mingmwang commented on a change in pull request #1842:
URL: https://github.com/apache/arrow-datafusion/pull/1842#discussion_r810728154
##########
File path: ballista/rust/scheduler/src/planner.rs
##########
@@ -55,24 +56,41 @@ impl Default for DistributedPlanner {
impl DistributedPlanner {
/// Returns a vector of ExecutionPlans, where the root node is a
[ShuffleWriterExec].
- /// Plans that depend on the input of other plans will have leaf nodes of
type [UnresolvedShuffleExec].
+ /// Plans that depend on the input of other plans will have leaf nodes of
type [UnresolvedShuffleExec]
+ /// or of type [ShuffleStreamReaderExec] if the created stages are
all-at-once stages.
/// A [ShuffleWriterExec] is created whenever the partitioning changes.
pub async fn plan_query_stages<'a>(
&'a mut self,
job_id: &'a str,
execution_plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<Arc<ShuffleWriterExec>>> {
- info!("planning query stages");
- let (new_plan, mut stages) = self
- .plan_query_stages_internal(job_id, execution_plan)
+ info!("planning query stages for job {}", job_id);
+ let (modified_plan, mut stages) = self
+ .plan_query_stages_internal(job_id, execution_plan.clone())
.await?;
- stages.push(create_shuffle_writer(
- job_id,
- self.next_stage_id(),
- new_plan,
- None,
- )?);
- Ok(stages)
+ // re-plan the input execution plan and create All-at-once query
stages.
+ // Now we just simply depends on the the stage count to decide whether
to create All-at-once or normal stages.
+ // In future, we can have more sophisticated way to decide which way
to go.
+ if stages.len() > 1 && stages.len() <= 4 {
Review comment:
> If I understand the original design correctly, the "all-at-once" plan
will only get scheduled when there are sufficient task slots available to run
the entire plan. So should this be a function of the total number of partitions?
Yes, you are right. But currently the scheduler server doesn't have a clear
view of how many task slots available. So here I just add simple check on the
stage count. After @yahoNanJing refactor the scheduler state and keep more
cpu/task info into the memory state, we can add more sophisticated check logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]