This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 69c0d7cc0c7 Fix: Repair action missing job parameters in 
`DatabricksRunNowOperator` (#67055)
69c0d7cc0c7 is described below

commit 69c0d7cc0c7d2d441d1ab7c581d4e0fe37c3ed1e
Author: Shen YuDong <[email protected]>
AuthorDate: Tue May 19 02:02:35 2026 +0800

    Fix: Repair action missing job parameters in `DatabricksRunNowOperator` 
(#67055)
    
    * fix issue: #64828, repair missing job parameters if exists
    
    * add unit test for job_parameters missing fix
---
 .../providers/databricks/operators/databricks.py   |  4 +++
 .../unit/databricks/operators/test_databricks.py   | 42 ++++++++++++++++++++++
 2 files changed, 46 insertions(+)

diff --git 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
index 788a12f1a36..4c581215be4 100644
--- 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
+++ 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
@@ -139,6 +139,8 @@ def _handle_databricks_operator_execution(operator, hook, 
log, context) -> None:
                     repair_json = {"run_id": operator.run_id, 
"rerun_all_failed_tasks": True}
                     if latest_repair_id is not None:
                         repair_json["latest_repair_id"] = latest_repair_id
+                    if "job_parameters" in operator.json:
+                        repair_json["job_parameters"] = 
operator.json["job_parameters"]
                     operator.json["latest_repair_id"] = 
hook.repair_run(repair_json)
                     _handle_databricks_operator_execution(operator, hook, log, 
context)
                 raise AirflowException(error_message)
@@ -976,6 +978,8 @@ class DatabricksRunNowOperator(BaseOperator):
                 repair_json = {"run_id": self.run_id, 
"rerun_all_failed_tasks": True}
                 if latest_repair_id is not None:
                     repair_json["latest_repair_id"] = latest_repair_id
+                if "job_parameters" in self.json:
+                    repair_json["job_parameters"] = self.json["job_parameters"]
                 self.json["latest_repair_id"] = 
self._hook.repair_run(repair_json)
                 _handle_deferrable_databricks_operator_execution(self, 
self._hook, self.log, context)
 
diff --git 
a/providers/databricks/tests/unit/databricks/operators/test_databricks.py 
b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
index e4ee05f36ef..84c625cddb7 100644
--- a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
+++ b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
@@ -1045,6 +1045,48 @@ class TestDatabricksSubmitRunOperator:
         op = DatabricksSubmitRunOperator(deferrable=True, task_id=TASK_ID, 
json=run)
         assert op.execute_complete(context=None, event=event) is None
 
+    
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+    @mock.patch(
+        
"airflow.providers.databricks.operators.databricks._handle_deferrable_databricks_operator_execution"
+    )
+    def test_execute_complete_repair_includes_job_parameters(self, 
mock_handle_exec, mock_hook_class):
+        mock_hook_instance = mock_hook_class.return_value
+        mock_hook_instance.get_job_id.return_value = 42
+        mock_hook_instance.get_latest_repair_id.return_value = None
+        mock_hook_instance.repair_run.return_value = "new_repair_id"
+
+        operator = DatabricksRunNowOperator(
+            task_id="test_task",
+            job_id=42,
+            json={"job_parameters": {"key1": "value1"}},
+            repair_run=True,
+            databricks_conn_id="test_conn",
+        )
+
+        context = {}
+        event = {
+            "run_id": 12345,
+            "run_page_url": "https://databricks-instance/#job/42/run/12345";,
+            "run_state": RunState(
+                life_cycle_state="TERMINATED", result_state="FAILED", 
state_message="Some error occurred"
+            ).to_json(),
+            "repair_run": True,
+            "errors": ["Error detail"],
+        }
+
+        operator.execute_complete(context=context, event=event)
+
+        assert mock_hook_instance.repair_run.called, "hook.repair_run should 
have been called"
+
+        call_args = mock_hook_instance.repair_run.call_args
+        repair_json_passed = call_args[0][0]
+
+        assert "job_parameters" in repair_json_passed
+        assert repair_json_passed["job_parameters"] == {"key1": "value1"}
+        assert repair_json_passed["run_id"] == 12345
+        assert repair_json_passed["rerun_all_failed_tasks"] is True
+        assert mock_handle_exec.called
+
     
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
     def test_execute_complete_failure(self, db_mock_class):
         """

Reply via email to