This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 08424fe4da2 Raise ValueError instead of KeyError when 
cancel_previous_runs=True and no job identifier is provided (#62393)
08424fe4da2 is described below

commit 08424fe4da204d8341525c46476bf173cd77ecfa
Author: SameerMesiah97 <[email protected]>
AuthorDate: Mon Mar 9 09:24:21 2026 +0000

    Raise ValueError instead of KeyError when cancel_previous_runs=True and no 
job identifier is provided (#62393)
    
    Co-authored-by: Wei Lee <[email protected]>
    Co-authored-by: Sameer Mesiah <[email protected]>
---
 .../providers/databricks/operators/databricks.py   |  9 ++++--
 .../unit/databricks/operators/test_databricks.py   | 36 +++++++++++++++++++++-
 2 files changed, 42 insertions(+), 3 deletions(-)

diff --git 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
index fe6240dad5c..2d70602bbd5 100644
--- 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
+++ 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
@@ -921,8 +921,13 @@ class DatabricksRunNowOperator(BaseOperator):
             self.json["job_id"] = job_id
             del self.json["job_name"]
 
-        if self.cancel_previous_runs and self.json["job_id"] is not None:
-            hook.cancel_all_runs(self.json["job_id"])
+        if self.cancel_previous_runs:
+            if (job_id := self.json.get("job_id")) is None:
+                raise ValueError(
+                    "cancel_previous_runs=True requires either job_id or 
job_name to be provided."
+                )
+
+            hook.cancel_all_runs(job_id)
 
         self.run_id = hook.run_now(self.json)
         if self.deferrable:
diff --git 
a/providers/databricks/tests/unit/databricks/operators/test_databricks.py 
b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
index 9c7b070a636..ca2561dca07 100644
--- a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
+++ b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
@@ -21,7 +21,7 @@ import hashlib
 from datetime import datetime, timedelta
 from typing import Any
 from unittest import mock
-from unittest.mock import MagicMock
+from unittest.mock import MagicMock, call
 
 import pytest
 
@@ -1634,6 +1634,40 @@ class TestDatabricksRunNowOperator:
         db_mock.get_run_page_url.assert_called_once_with(RUN_ID)
         db_mock.get_run.assert_not_called()
 
+    
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+    def test_cancel_previous_runs_without_job_id_raises(self, db_mock_class):
+        run = {
+            "notebook_params": NOTEBOOK_PARAMS,
+            "notebook_task": NOTEBOOK_TASK,
+            "jar_params": JAR_PARAMS,
+        }
+
+        op = DatabricksRunNowOperator(
+            task_id=TASK_ID,
+            json=run,
+            cancel_previous_runs=True,
+        )
+
+        db_mock = db_mock_class.return_value
+
+        with pytest.raises(
+            ValueError,
+            match="cancel_previous_runs=True requires either job_id or 
job_name",
+        ):
+            op.execute(None)
+
+        assert db_mock_class.mock_calls == [
+            call(
+                DEFAULT_CONN_ID,
+                retry_limit=op.databricks_retry_limit,
+                retry_delay=op.databricks_retry_delay,
+                retry_args=None,
+                caller="DatabricksRunNowOperator",
+            )
+        ]
+        assert db_mock.cancel_all_runs.mock_calls == []
+        assert db_mock.run_now.mock_calls == []
+
     
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
     def test_execute_task_deferred(self, db_mock_class):
         """

Reply via email to