This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 35d13cffa14 providers(databricks): fail deferrable runs that terminate 
before defer (#62917)
35d13cffa14 is described below

commit 35d13cffa1420d2f1454f18c3fe6d25ff3c213c2
Author: Kushal Bohra <[email protected]>
AuthorDate: Wed Mar 11 18:30:00 2026 -0700

    providers(databricks): fail deferrable runs that terminate before defer 
(#62917)
---
 .../airflow/providers/databricks/operators/databricks.py    | 13 +++++++++++--
 .../tests/unit/databricks/operators/test_databricks.py      | 10 ++++++----
 2 files changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
index 95943e8b6b6..2ec8f03b4ed 100644
--- 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
+++ 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
@@ -210,8 +210,17 @@ def 
_handle_deferrable_databricks_operator_execution(operator, hook, log, contex
                 method_name=DEFER_METHOD_NAME,
             )
         else:
-            if run_state.is_successful:
-                log.info("%s completed successfully.", operator.task_id)
+            failed_tasks = extract_failed_task_errors(hook, run_info, 
run_state)
+            operator.execute_complete(
+                context=context,
+                event={
+                    "run_id": operator.run_id,
+                    "run_page_url": run_page_url,
+                    "run_state": run_state.to_json(),
+                    "repair_run": getattr(operator, "repair_run", False),
+                    "errors": failed_tasks,
+                },
+            )
 
 
 def _handle_deferrable_databricks_operator_completion(event: dict, log: 
Logger) -> None:
diff --git 
a/providers/databricks/tests/unit/databricks/operators/test_databricks.py 
b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
index ca2561dca07..82888fc461b 100644
--- a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
+++ b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
@@ -1083,7 +1083,7 @@ class TestDatabricksSubmitRunOperator:
     
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
     
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksSubmitRunOperator.defer")
     def 
test_databricks_submit_run_deferrable_operator_failed_before_defer(self, 
mock_defer, db_mock_class):
-        """Asserts that a task is not deferred when its failed"""
+        """Asserts that terminal failures before deferral fail the task 
immediately."""
         run = {
             "new_cluster": NEW_CLUSTER,
             "notebook_task": NOTEBOOK_TASK,
@@ -1092,7 +1092,8 @@ class TestDatabricksSubmitRunOperator:
         db_mock = db_mock_class.return_value
         db_mock.submit_run.return_value = RUN_ID
         db_mock.get_run = make_run_with_state_mock("TERMINATED", "FAILED")
-        op.execute(None)
+        with pytest.raises(AirflowException, match="Job run failed with 
terminal state"):
+            op.execute(None)
 
         expected = utils.normalise_json_content(
             {"new_cluster": NEW_CLUSTER, "notebook_task": NOTEBOOK_TASK, 
"run_name": TASK_ID}
@@ -1891,14 +1892,15 @@ class TestDatabricksRunNowOperator:
     
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
     
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksSubmitRunOperator.defer")
     def test_databricks_run_now_deferrable_operator_failed_before_defer(self, 
mock_defer, db_mock_class):
-        """Asserts that a task is not deferred when its failed"""
+        """Asserts that terminal failures before deferral fail the task 
immediately."""
         run = {"notebook_params": NOTEBOOK_PARAMS, "notebook_task": 
NOTEBOOK_TASK, "jar_params": JAR_PARAMS}
         op = DatabricksRunNowOperator(deferrable=True, task_id=TASK_ID, 
job_id=JOB_ID, json=run)
         db_mock = db_mock_class.return_value
         db_mock.run_now.return_value = RUN_ID
         db_mock.get_run = make_run_with_state_mock("TERMINATED", "FAILED")
 
-        op.execute(None)
+        with pytest.raises(AirflowException, match="Job run failed with 
terminal state"):
+            op.execute(None)
         expected = utils.normalise_json_content(
             {
                 "notebook_params": NOTEBOOK_PARAMS,

Reply via email to