milenkovicm opened a new issue, #1880:
URL: https://github.com/apache/datafusion-ballista/issues/1880

   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   
   `ShuffleReaderExec` holds information about each partition 
(`PartitionLocation`) even this information is not required for a specific task 
(partition). This will make plan very big when partition number is big, making 
serialistaion hitting grpc message size 
(https://github.com/apache/datafusion-ballista/pull/1853#issuecomment-4741128632)
   
   Currently, once serialised stage plan is cached (and reused for each task), 
this functionallity could be disabled using `disable-stage-plan-cache` 
scheduler feature, but even disabled it will still encode information about all 
partitions 
   
   **Describe the solution you'd like**
   
   Implement stage encoding logic which would take into account actual 
partition and remove unrelated data 
   
   **Describe alternatives you've considered**
   
   **Additional context**
   
   This work is orthogonal to #1853
   
   Scheduler feature `disable-stage-plan-cache` might disable plan caching but 
current  implementation will not take into account actual partition information,
   
   I had a quick experiment with 
   
   ```diff
   diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
   index f882d26f..3a60d897 100644
   --- a/ballista/scheduler/Cargo.toml
   +++ b/ballista/scheduler/Cargo.toml
   @@ -34,7 +34,7 @@ required-features = ["build-binary"]
    
    [features]
    build-binary = ["clap", "tracing-subscriber", "tracing-appender", 
"tracing", "ballista-core/build-binary"]
   -default = ["build-binary", "rest-api"]
   +default = ["build-binary", "rest-api", "disable-stage-plan-cache"]
    # job info can cache stage plans, in some cases where
    # task plans can be re-computed, cache behavior may need to be disabled.
    disable-stage-plan-cache = []
   diff --git a/ballista/scheduler/src/state/task_manager.rs 
b/ballista/scheduler/src/state/task_manager.rs
   index f04c10e6..255aa27e 100644
   --- a/ballista/scheduler/src/state/task_manager.rs
   +++ b/ballista/scheduler/src/state/task_manager.rs
   @@ -167,6 +167,7 @@ impl JobInfoCache {
        fn encode_stage_plan<U: AsExecutionPlan>(
            &mut self,
            stage_id: usize,
   +        partition_id: usize,
            plan: &Arc<dyn ExecutionPlan>,
            codec: &dyn PhysicalExtensionCodec,
        ) -> Result<Vec<u8>> {
   @@ -186,13 +187,57 @@ impl JobInfoCache {
        fn encode_stage_plan<U: AsExecutionPlan>(
            &mut self,
            _stage_id: usize,
   +        partition_id: usize,
            plan: &Arc<dyn ExecutionPlan>,
            codec: &dyn PhysicalExtensionCodec,
        ) -> Result<Vec<u8>> {
   +        use ballista_core::execution_plans::ShuffleReaderExec;
   +        use datafusion::common::tree_node::{Transformed, TreeNode};
   +
   +        let transformed = plan.clone().transform_up(|node| {
   +            if let Some(reader) = 
node.as_any().downcast_ref::<ShuffleReaderExec>() {
   +                if reader.broadcast {
   +                    return Ok(Transformed::no(node));
   +                }
   +                let new_partition: Vec<Vec<_>> = reader
   +                    .partition
   +                    .iter()
   +                    .enumerate()
   +                    .map(|(i, locs)| {
   +                        if i == partition_id || locs.len() <= 1 {
   +                            locs.clone()
   +                        } else {
   +                            vec![]
   +                        }
   +                    })
   +                    .collect();
   +                let new_reader = if let Some(coalesce) = 
reader.coalesce.clone() {
   +                    ShuffleReaderExec::try_new_coalesced(
   +                        reader.stage_id,
   +                        new_partition,
   +                        coalesce,
   +                        reader.schema(),
   +                        reader.properties().output_partitioning().clone(),
   +                    )?
   +                } else {
   +                    ShuffleReaderExec::try_new(
   +                        reader.stage_id,
   +                        new_partition,
   +                        reader.schema(),
   +                        reader.properties().output_partitioning().clone(),
   +                    )?
   +                };
   +                Ok(Transformed::yes(
   +                    Arc::new(new_reader) as Arc<dyn ExecutionPlan>
   +                ))
   +            } else {
   +                Ok(Transformed::no(node))
   +            }
   +        })?;
   +
            let mut plan_buf: Vec<u8> = vec![];
   -        let plan_proto = U::try_from_physical_plan(plan.clone(), codec)?;
   +        let plan_proto = U::try_from_physical_plan(transformed.data, 
codec)?;
            plan_proto.try_encode(&mut plan_buf)?;
   -
            Ok(plan_buf)
        }
    }
   @@ -656,6 +701,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
            if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) {
                let plan = job_info.encode_stage_plan::<PhysicalPlanNode>(
                    stage_id,
   +                task.partition.partition_id,
                    &task.plan,
                    self.codec.physical_extension_codec(),
                )?;
   @@ -733,6 +779,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                if let Some(mut job_info) = 
self.active_job_cache.get_mut(&job_id) {
                    let plan = job_info.encode_stage_plan::<PhysicalPlanNode>(
                        stage_id,
   +                    task.partition.partition_id,
                        &task.plan,
                        self.codec.physical_extension_codec(),
                    )?;
   ```
   
   and there is no encoding exception as seen in 
https://github.com/apache/datafusion-ballista/pull/1853#issuecomment-4742357916 
but for some reason it returns 0 result, which i believe its due to last stage 
for which my quick change does not work 
   
   <img width="1487" height="399" alt="Image" 
src="https://github.com/user-attachments/assets/5671006b-6cd7-4151-88f5-6740dcd09b3f";
 />
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to