This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push: new b728351e fix: fail job in case of serde error (pull-mode) (#1297) b728351e is described below commit b728351e251aa00ab0639c58a2ad5c9305d7e5af Author: Marko Milenković <milenkov...@users.noreply.github.com> AuthorDate: Thu Aug 28 14:50:37 2025 +0100 fix: fail job in case of serde error (pull-mode) (#1297) --- ballista/client/tests/context_unsupported.rs | 42 +++++++++++++++++++++++++ ballista/scheduler/src/scheduler_server/grpc.rs | 4 +++ ballista/scheduler/src/scheduler_server/mod.rs | 20 ++++++++++++ 3 files changed, 66 insertions(+) diff --git a/ballista/client/tests/context_unsupported.rs b/ballista/client/tests/context_unsupported.rs index 0b97e20a..3f17d330 100644 --- a/ballista/client/tests/context_unsupported.rs +++ b/ballista/client/tests/context_unsupported.rs @@ -146,4 +146,46 @@ mod unsupported { assert_batches_eq!(expected, &result); } + + // at the moment sort merge join is not supported due to + // serde issues. it should be supported with DF.50 + #[rstest] + #[case::standalone(standalone_context())] + #[case::remote(remote_context())] + #[tokio::test] + #[should_panic] + async fn should_support_sort_merge_join( + #[future(awt)] + #[case] + ctx: SessionContext, + test_data: String, + ) { + ctx.register_parquet( + "t0", + &format!("{test_data}/alltypes_plain.parquet"), + Default::default(), + ) + .await + .unwrap(); + + ctx.register_parquet( + "t1", + &format!("{test_data}/alltypes_plain.parquet"), + Default::default(), + ) + .await + .unwrap(); + ctx.sql("SET datafusion.optimizer.prefer_hash_join = false") + .await + .unwrap() + .show() + .await + .unwrap(); + ctx.sql("select t0.id from t0 join t1 on t0.id = t1.id") + .await + .unwrap() + .show() + .await + .unwrap(); + } } diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index 7cf9d795..e9337d9c 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -130,10 +130,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc let mut tasks = vec![]; for (_, task) in schedulable_tasks { + let job_id = task.partition.job_id.clone(); match self.state.task_manager.prepare_task_definition(task) { Ok(task_definition) => tasks.push(task_definition), Err(e) => { error!("Error preparing task definition: {e:?}"); + if let Err(e) = self.fail_job(job_id, e.to_string()).await { + error!("Error when failing job: {e:?}") + } } } } diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index 048dbcf8..e741601e 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -175,6 +175,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T Ok(()) } + pub(crate) async fn fail_job( + &self, + job_id: String, + fail_message: String, + ) -> Result<()> { + log::debug!("Received fail job request for job {job_id}"); + + self.query_stage_event_loop + .get_sender()? + .post_event(QueryStageSchedulerEvent::JobRunningFailed { + job_id, + fail_message, + queued_at: timestamp_millis(), + failed_at: timestamp_millis(), + }) + .await?; + + Ok(()) + } + /// Submits a job to executor returning job_id pub async fn submit_job( &self, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org