This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new b728351e fix: fail job in case of serde error (pull-mode) (#1297)
b728351e is described below

commit b728351e251aa00ab0639c58a2ad5c9305d7e5af
Author: Marko Milenković <milenkov...@users.noreply.github.com>
AuthorDate: Thu Aug 28 14:50:37 2025 +0100

    fix: fail job in case of serde error (pull-mode) (#1297)
---
 ballista/client/tests/context_unsupported.rs    | 42 +++++++++++++++++++++++++
 ballista/scheduler/src/scheduler_server/grpc.rs |  4 +++
 ballista/scheduler/src/scheduler_server/mod.rs  | 20 ++++++++++++
 3 files changed, 66 insertions(+)

diff --git a/ballista/client/tests/context_unsupported.rs 
b/ballista/client/tests/context_unsupported.rs
index 0b97e20a..3f17d330 100644
--- a/ballista/client/tests/context_unsupported.rs
+++ b/ballista/client/tests/context_unsupported.rs
@@ -146,4 +146,46 @@ mod unsupported {
 
         assert_batches_eq!(expected, &result);
     }
+
+    // at the moment sort merge join is not supported due to
+    // serde issues. it should be supported with DF.50
+    #[rstest]
+    #[case::standalone(standalone_context())]
+    #[case::remote(remote_context())]
+    #[tokio::test]
+    #[should_panic]
+    async fn should_support_sort_merge_join(
+        #[future(awt)]
+        #[case]
+        ctx: SessionContext,
+        test_data: String,
+    ) {
+        ctx.register_parquet(
+            "t0",
+            &format!("{test_data}/alltypes_plain.parquet"),
+            Default::default(),
+        )
+        .await
+        .unwrap();
+
+        ctx.register_parquet(
+            "t1",
+            &format!("{test_data}/alltypes_plain.parquet"),
+            Default::default(),
+        )
+        .await
+        .unwrap();
+        ctx.sql("SET datafusion.optimizer.prefer_hash_join = false")
+            .await
+            .unwrap()
+            .show()
+            .await
+            .unwrap();
+        ctx.sql("select t0.id from t0 join t1 on t0.id = t1.id")
+            .await
+            .unwrap()
+            .show()
+            .await
+            .unwrap();
+    }
 }
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 7cf9d795..e9337d9c 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -130,10 +130,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
 
             let mut tasks = vec![];
             for (_, task) in schedulable_tasks {
+                let job_id = task.partition.job_id.clone();
                 match self.state.task_manager.prepare_task_definition(task) {
                     Ok(task_definition) => tasks.push(task_definition),
                     Err(e) => {
                         error!("Error preparing task definition: {e:?}");
+                        if let Err(e) = self.fail_job(job_id, 
e.to_string()).await {
+                            error!("Error when failing job: {e:?}")
+                        }
                     }
                 }
             }
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index 048dbcf8..e741601e 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -175,6 +175,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
         Ok(())
     }
 
+    pub(crate) async fn fail_job(
+        &self,
+        job_id: String,
+        fail_message: String,
+    ) -> Result<()> {
+        log::debug!("Received fail job request for job {job_id}");
+
+        self.query_stage_event_loop
+            .get_sender()?
+            .post_event(QueryStageSchedulerEvent::JobRunningFailed {
+                job_id,
+                fail_message,
+                queued_at: timestamp_millis(),
+                failed_at: timestamp_millis(),
+            })
+            .await?;
+
+        Ok(())
+    }
+
     /// Submits a job to executor returning job_id
     pub async fn submit_job(
         &self,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to