This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 4fa5b5f3 Fix job hangs when partition count of plan is zero (#1024)
4fa5b5f3 is described below

commit 4fa5b5f336ef0cca8597c830584ad267bdb53947
Author: 张林伟 <[email protected]>
AuthorDate: Mon Jul 15 22:30:01 2024 +0800

    Fix job hangs when partition count of plan is zero (#1024)
    
    * Fix job hangs when partition count of plan is zero
    
    * fix compilation issues
    
    ---------
    
    Co-authored-by: Andy Grove <[email protected]>
---
 ballista/core/src/serde/generated/ballista.rs |  1 -
 ballista/scheduler/src/state/mod.rs           | 24 ++++++++++++++++++++++--
 2 files changed, 22 insertions(+), 3 deletions(-)

diff --git a/ballista/core/src/serde/generated/ballista.rs 
b/ballista/core/src/serde/generated/ballista.rs
index 50df950a..13a17f7e 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -1,4 +1,3 @@
-// This file is @generated by prost-build.
 /// 
/////////////////////////////////////////////////////////////////////////////////////////////////
 /// Ballista Physical Plan
 /// 
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index 90c9f6f3..b30c9d92 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -15,10 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
+use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
 use datafusion::datasource::listing::{ListingTable, ListingTableUrl};
 use datafusion::datasource::source_as_provider;
 use datafusion::error::DataFusionError;
+use datafusion::physical_plan::ExecutionPlanProperties;
 use std::any::type_name;
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -39,6 +40,7 @@ use ballista_core::serde::protobuf::TaskStatus;
 use ballista_core::serde::BallistaCodec;
 use datafusion::logical_expr::LogicalPlan;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::prelude::SessionContext;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use datafusion_proto::physical_plan::AsExecutionPlan;
@@ -408,8 +410,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
             DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
         );
 
+        let plan = plan.transform_down(&|node| {
+            if node.output_partitioning().partition_count() == 0 {
+                Ok(Transformed::yes(Arc::new(EmptyExec::new(node.schema()))))
+            } else {
+                Ok(Transformed::no(node))
+            }
+        })?;
+        debug!(
+            "Transformed physical plan: {}",
+            DisplayableExecutionPlan::new(plan.data.as_ref()).indent(false)
+        );
+
         self.task_manager
-            .submit_job(job_id, job_name, &session_ctx.session_id(), plan, 
queued_at)
+            .submit_job(
+                job_id,
+                job_name,
+                &session_ctx.session_id(),
+                plan.data,
+                queued_at,
+            )
             .await?;
 
         let elapsed = start.elapsed();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to