metegenez commented on code in PR #1726:
URL: 
https://github.com/apache/datafusion-ballista/pull/1726#discussion_r3267166923


##########
ballista/scheduler/src/state/mod.rs:
##########
@@ -380,120 +366,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         job_id: &str,
         job_name: &str,
         session_ctx: Arc<SessionContext>,
-        plan: &LogicalPlan,
+        logical_plan: &LogicalPlan,
         queued_at: u64,
         subscriber: Option<JobStatusSubscriber>,
     ) -> Result<()> {
         let start = Instant::now();
-        let session_config = Arc::new(session_ctx.copied_config());
-        if log::max_level() >= log::Level::Debug {
-            // optimizing the plan here is redundant because the physical 
planner will do this again
-            // but it is helpful to see what the optimized plan will be
-            let optimized_plan = session_ctx.state().optimize(plan)?;
-            debug!("Optimized plan: {}", optimized_plan.display_indent());
-        }
-
-        let mut explain_inner_logical_plan: Option<Arc<LogicalPlan>> = None;
-        plan.apply(&mut |plan: &LogicalPlan| {
-            if let LogicalPlan::TableScan(scan) = plan {
-                let provider = source_as_provider(&scan.source)?;
-                if let Some(table) = 
provider.as_any().downcast_ref::<ListingTable>() {
-                    let local_paths: Vec<&ListingTableUrl> = table
-                        .table_paths()
-                        .iter()
-                        .filter(|url| url.as_str().starts_with("file:///"))
-                        .collect();
-                    if !local_paths.is_empty() {
-                        // These are local files rather than remote object 
stores, so we
-                        // need to check that they are accessible on the 
scheduler (the client
-                        // may not be on the same host, or the data path may 
not be correctly
-                        // mounted in the container). There could be thousands 
of files so we
-                        // just check the first one.
-                        let url = &local_paths[0].as_str();
-                        // the unwraps are safe here because we checked that 
the url starts with file:///
-                        // we need to check both versions here to support 
Linux & Windows
-                        
ListingTableUrl::parse(url.strip_prefix("file://").unwrap())
-                            .or_else(|_| {
-                                ListingTableUrl::parse(
-                                    url.strip_prefix("file:///").unwrap(),
-                                )
-                            })
-                            .map_err(|e| {
-                                DataFusionError::External(
-                                    format!(
-                                        "logical plan refers to path on local 
file system \
-                                that is not accessible in the scheduler: 
{url}: {e:?}"
-                                    )
-                                        .into(),
-                                )
-                            })?;
-                    }
-                }
-            } else if let LogicalPlan::Explain(explain_plan) = plan {
-                explain_inner_logical_plan = Some(explain_plan.plan.clone());
-            }
-            Ok(TreeNodeRecursion::Continue)
-        })?;
-
-        let explain_distributed_plan = if let Some(inner_lp) = 
explain_inner_logical_plan
-        {
-            Some(
-                generate_distributed_explain_plan(job_id, session_ctx.clone(), 
inner_lp)
-                    .await?,
-            )
-        } else {
-            None
-        };
-        let logical_plan_str = plan.display_indent().to_string();
-
-        let plan = session_ctx.state().create_physical_plan(plan).await?;
-        debug!(
-            "Physical plan: {}",
-            DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
-        );
-
-        let plan = plan.transform_down(&|node: Arc<dyn ExecutionPlan>| {
-            if node.output_partitioning().partition_count() == 0 {
-                let empty: Arc<dyn ExecutionPlan> =
-                    Arc::new(EmptyExec::new(node.schema()));

Review Comment:
   Try to reproduce a case for this, couldnt make it. I also think this is safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to