This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 35d13cffa14 providers(databricks): fail deferrable runs that terminate
before defer (#62917)
35d13cffa14 is described below
commit 35d13cffa1420d2f1454f18c3fe6d25ff3c213c2
Author: Kushal Bohra <[email protected]>
AuthorDate: Wed Mar 11 18:30:00 2026 -0700
providers(databricks): fail deferrable runs that terminate before defer
(#62917)
---
.../airflow/providers/databricks/operators/databricks.py | 13 +++++++++++--
.../tests/unit/databricks/operators/test_databricks.py | 10 ++++++----
2 files changed, 17 insertions(+), 6 deletions(-)
diff --git
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
index 95943e8b6b6..2ec8f03b4ed 100644
---
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
+++
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
@@ -210,8 +210,17 @@ def
_handle_deferrable_databricks_operator_execution(operator, hook, log, contex
method_name=DEFER_METHOD_NAME,
)
else:
- if run_state.is_successful:
- log.info("%s completed successfully.", operator.task_id)
+ failed_tasks = extract_failed_task_errors(hook, run_info,
run_state)
+ operator.execute_complete(
+ context=context,
+ event={
+ "run_id": operator.run_id,
+ "run_page_url": run_page_url,
+ "run_state": run_state.to_json(),
+ "repair_run": getattr(operator, "repair_run", False),
+ "errors": failed_tasks,
+ },
+ )
def _handle_deferrable_databricks_operator_completion(event: dict, log:
Logger) -> None:
diff --git
a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
index ca2561dca07..82888fc461b 100644
--- a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
+++ b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
@@ -1083,7 +1083,7 @@ class TestDatabricksSubmitRunOperator:
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksSubmitRunOperator.defer")
def
test_databricks_submit_run_deferrable_operator_failed_before_defer(self,
mock_defer, db_mock_class):
- """Asserts that a task is not deferred when its failed"""
+ """Asserts that terminal failures before deferral fail the task
immediately."""
run = {
"new_cluster": NEW_CLUSTER,
"notebook_task": NOTEBOOK_TASK,
@@ -1092,7 +1092,8 @@ class TestDatabricksSubmitRunOperator:
db_mock = db_mock_class.return_value
db_mock.submit_run.return_value = RUN_ID
db_mock.get_run = make_run_with_state_mock("TERMINATED", "FAILED")
- op.execute(None)
+ with pytest.raises(AirflowException, match="Job run failed with
terminal state"):
+ op.execute(None)
expected = utils.normalise_json_content(
{"new_cluster": NEW_CLUSTER, "notebook_task": NOTEBOOK_TASK,
"run_name": TASK_ID}
@@ -1891,14 +1892,15 @@ class TestDatabricksRunNowOperator:
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksSubmitRunOperator.defer")
def test_databricks_run_now_deferrable_operator_failed_before_defer(self,
mock_defer, db_mock_class):
- """Asserts that a task is not deferred when its failed"""
+ """Asserts that terminal failures before deferral fail the task
immediately."""
run = {"notebook_params": NOTEBOOK_PARAMS, "notebook_task":
NOTEBOOK_TASK, "jar_params": JAR_PARAMS}
op = DatabricksRunNowOperator(deferrable=True, task_id=TASK_ID,
job_id=JOB_ID, json=run)
db_mock = db_mock_class.return_value
db_mock.run_now.return_value = RUN_ID
db_mock.get_run = make_run_with_state_mock("TERMINATED", "FAILED")
- op.execute(None)
+ with pytest.raises(AirflowException, match="Job run failed with
terminal state"):
+ op.execute(None)
expected = utils.normalise_json_content(
{
"notebook_params": NOTEBOOK_PARAMS,