This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 4fa5b5f3 Fix job hangs when partition count of plan is zero (#1024)
4fa5b5f3 is described below
commit 4fa5b5f336ef0cca8597c830584ad267bdb53947
Author: 张林伟 <[email protected]>
AuthorDate: Mon Jul 15 22:30:01 2024 +0800
Fix job hangs when partition count of plan is zero (#1024)
* Fix job hangs when partition count of plan is zero
* fix compilation issues
---------
Co-authored-by: Andy Grove <[email protected]>
---
ballista/core/src/serde/generated/ballista.rs | 1 -
ballista/scheduler/src/state/mod.rs | 24 ++++++++++++++++++++++--
2 files changed, 22 insertions(+), 3 deletions(-)
diff --git a/ballista/core/src/serde/generated/ballista.rs
b/ballista/core/src/serde/generated/ballista.rs
index 50df950a..13a17f7e 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -1,4 +1,3 @@
-// This file is @generated by prost-build.
///
/////////////////////////////////////////////////////////////////////////////////////////////////
/// Ballista Physical Plan
///
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index 90c9f6f3..b30c9d92 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
+use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion::datasource::listing::{ListingTable, ListingTableUrl};
use datafusion::datasource::source_as_provider;
use datafusion::error::DataFusionError;
+use datafusion::physical_plan::ExecutionPlanProperties;
use std::any::type_name;
use std::collections::HashMap;
use std::sync::Arc;
@@ -39,6 +40,7 @@ use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::BallistaCodec;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::empty::EmptyExec;
use datafusion::prelude::SessionContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
@@ -408,8 +410,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);
+ let plan = plan.transform_down(&|node| {
+ if node.output_partitioning().partition_count() == 0 {
+ Ok(Transformed::yes(Arc::new(EmptyExec::new(node.schema()))))
+ } else {
+ Ok(Transformed::no(node))
+ }
+ })?;
+ debug!(
+ "Transformed physical plan: {}",
+ DisplayableExecutionPlan::new(plan.data.as_ref()).indent(false)
+ );
+
self.task_manager
- .submit_job(job_id, job_name, &session_ctx.session_id(), plan,
queued_at)
+ .submit_job(
+ job_id,
+ job_name,
+ &session_ctx.session_id(),
+ plan.data,
+ queued_at,
+ )
.await?;
let elapsed = start.elapsed();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]