This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 69c0d7cc0c7 Fix: Repair action missing job parameters in
`DatabricksRunNowOperator` (#67055)
69c0d7cc0c7 is described below
commit 69c0d7cc0c7d2d441d1ab7c581d4e0fe37c3ed1e
Author: Shen YuDong <[email protected]>
AuthorDate: Tue May 19 02:02:35 2026 +0800
Fix: Repair action missing job parameters in `DatabricksRunNowOperator`
(#67055)
* fix issue: #64828, repair missing job parameters if exists
* add unit test for job_parameters missing fix
---
.../providers/databricks/operators/databricks.py | 4 +++
.../unit/databricks/operators/test_databricks.py | 42 ++++++++++++++++++++++
2 files changed, 46 insertions(+)
diff --git
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
index 788a12f1a36..4c581215be4 100644
---
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
+++
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
@@ -139,6 +139,8 @@ def _handle_databricks_operator_execution(operator, hook,
log, context) -> None:
repair_json = {"run_id": operator.run_id,
"rerun_all_failed_tasks": True}
if latest_repair_id is not None:
repair_json["latest_repair_id"] = latest_repair_id
+ if "job_parameters" in operator.json:
+ repair_json["job_parameters"] =
operator.json["job_parameters"]
operator.json["latest_repair_id"] =
hook.repair_run(repair_json)
_handle_databricks_operator_execution(operator, hook, log,
context)
raise AirflowException(error_message)
@@ -976,6 +978,8 @@ class DatabricksRunNowOperator(BaseOperator):
repair_json = {"run_id": self.run_id,
"rerun_all_failed_tasks": True}
if latest_repair_id is not None:
repair_json["latest_repair_id"] = latest_repair_id
+ if "job_parameters" in self.json:
+ repair_json["job_parameters"] = self.json["job_parameters"]
self.json["latest_repair_id"] =
self._hook.repair_run(repair_json)
_handle_deferrable_databricks_operator_execution(self,
self._hook, self.log, context)
diff --git
a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
index e4ee05f36ef..84c625cddb7 100644
--- a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
+++ b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
@@ -1045,6 +1045,48 @@ class TestDatabricksSubmitRunOperator:
op = DatabricksSubmitRunOperator(deferrable=True, task_id=TASK_ID,
json=run)
assert op.execute_complete(context=None, event=event) is None
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ @mock.patch(
+
"airflow.providers.databricks.operators.databricks._handle_deferrable_databricks_operator_execution"
+ )
+ def test_execute_complete_repair_includes_job_parameters(self,
mock_handle_exec, mock_hook_class):
+ mock_hook_instance = mock_hook_class.return_value
+ mock_hook_instance.get_job_id.return_value = 42
+ mock_hook_instance.get_latest_repair_id.return_value = None
+ mock_hook_instance.repair_run.return_value = "new_repair_id"
+
+ operator = DatabricksRunNowOperator(
+ task_id="test_task",
+ job_id=42,
+ json={"job_parameters": {"key1": "value1"}},
+ repair_run=True,
+ databricks_conn_id="test_conn",
+ )
+
+ context = {}
+ event = {
+ "run_id": 12345,
+ "run_page_url": "https://databricks-instance/#job/42/run/12345",
+ "run_state": RunState(
+ life_cycle_state="TERMINATED", result_state="FAILED",
state_message="Some error occurred"
+ ).to_json(),
+ "repair_run": True,
+ "errors": ["Error detail"],
+ }
+
+ operator.execute_complete(context=context, event=event)
+
+ assert mock_hook_instance.repair_run.called, "hook.repair_run should
have been called"
+
+ call_args = mock_hook_instance.repair_run.call_args
+ repair_json_passed = call_args[0][0]
+
+ assert "job_parameters" in repair_json_passed
+ assert repair_json_passed["job_parameters"] == {"key1": "value1"}
+ assert repair_json_passed["run_id"] == 12345
+ assert repair_json_passed["rerun_all_failed_tasks"] is True
+ assert mock_handle_exec.called
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
def test_execute_complete_failure(self, db_mock_class):
"""