This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d5c762aeca Forward Airflow Dag params to Databricks job parameters in 
CreateJobs/SubmitRun/RunNow (#66613)
3d5c762aeca is described below

commit 3d5c762aecae2a6b66b887771ba82403bb0a13e5
Author: Noritaka Sekiyama <[email protected]>
AuthorDate: Tue May 26 15:36:08 2026 +0900

    Forward Airflow Dag params to Databricks job parameters in 
CreateJobs/SubmitRun/RunNow (#66613)
    
    * Pass airflow config as job parameters in databrickCreateJobOperator
    
    * Adds UT
    
    * Refactor airflow params auto-injection and extend to RunNow / SubmitRun
    
    Apply Lee-W's review suggestion from PR #39007: replace the manual loop
    with a list comprehension that uses ``params.dump()`` (the original
    ``params.items()`` iteration yielded ``Param`` objects rather than the
    resolved values, which would not serialise into the Databricks API).
    
    Extend the same pattern to:
    * DatabricksRunNowOperator -> populate top-level ``job_parameters`` (the
      dict-shaped slot already supported by the run-now endpoint).
    * DatabricksSubmitRunOperator -> populate dict-shaped per-task parameter
      fields (notebook_task.base_parameters, python_wheel_task.named_parameters,
      sql_task.parameters, run_job_task.job_parameters). Tasks whose only
      parameter field is ``List[str]`` (spark_jar_task, spark_python_task,
      spark_submit_task) are intentionally skipped because there is no
      canonical mapping from a key/value dict to positional CLI arguments.
    
    Drop the ``"parameters": []`` expectation that was added to the existing
    test_exec_create / test_exec_reset cases by PR #39007 — it never matched
    the source logic (``self.params`` is falsy when no params are set, so no
    ``parameters`` key is added).
    
    Add tests covering: auto-injection for each operator, no override when
    the field is already populated, and the per-task injection rules for
    SubmitRun.
    
    * Document Airflow params auto-injection in operator user guides
    
    Add a "Forwarding Airflow Dag params" section to the jobs_create, run_now,
    and submit_run operator guides describing the new behaviour: when the
    operator's params dict is non-empty and the corresponding json slot is
    empty, params are auto-injected as job-level parameters / job_parameters /
    per-task dict-shaped parameters respectively.
    
    * Fix static checks: PT006 tuple form for parametrize, dict() for 
self.params
    
    - pytest.mark.parametrize first arg must be a tuple of names, not a 
comma-separated
      string (PT006).
    - Replace self.params.dump() with dict(self.params) so the call works on 
both the
      ParamsDict and the plain-dict legs of self.params' union type, satisfying
      mypy union-attr.
    
    * Clarify Airflow-params-to-Databricks-params mapping in operator docs
    
    Address Lee-W's review feedback that the auto-injection example was hard to
    parse. Each operator's section now:
    
    - Names the exact Databricks API field being populated and links to its
      schema (parameters / job_parameters / per-task slots).
    - States explicitly that each <key>: <value> pair in params becomes one
      {"name": <key>, "default": <value>} entry (CreateJobs) or is passed
      through unchanged (RunNow / SubmitRun).
    - Splits params into a named variable in the CreateJobs example so the
      key/value to name/default mapping reads top-to-bottom.
    
    ---------
    
    Co-authored-by: subham611 <[email protected]>
---
 .../databricks/docs/operators/jobs_create.rst      |  35 +++++
 providers/databricks/docs/operators/run_now.rst    |  25 +++
 providers/databricks/docs/operators/submit_run.rst |  32 ++++
 .../providers/databricks/operators/databricks.py   |  58 +++++++
 .../unit/databricks/operators/test_databricks.py   | 174 +++++++++++++++++++++
 5 files changed, 324 insertions(+)

diff --git a/providers/databricks/docs/operators/jobs_create.rst 
b/providers/databricks/docs/operators/jobs_create.rst
index fd2d9a906ff..115dcc39d09 100644
--- a/providers/databricks/docs/operators/jobs_create.rst
+++ b/providers/databricks/docs/operators/jobs_create.rst
@@ -58,6 +58,41 @@ Currently the named parameters that 
``DatabricksCreateJobsOperator`` supports ar
   - ``access_control_list``
 
 
+Forwarding Airflow Dag params as Databricks job parameters
+----------------------------------------------------------
+
+The Databricks ``api/2.2/jobs/create`` endpoint accepts a top-level 
``parameters`` field
+that defines `job-level parameters
+<https://docs.databricks.com/api/workspace/jobs/create#parameters>`_ — a list 
of objects
+with a ``name`` (the parameter name) and a ``default`` (its default value), 
for example
+``[{"name": "env", "default": "prod"}]``.
+
+If ``parameters`` is not set in ``json`` and the operator's ``params`` dict is 
non-empty,
+each key/value pair in ``params`` is converted into one such ``{"name": key, 
"default":
+value}`` entry, so that Airflow Dag params can be forwarded as Databricks job 
parameters
+without hardcoding the API shape in ``json``. If ``json`` already contains 
``parameters``,
+it is left untouched.
+
+.. code-block:: python
+
+  # Airflow Dag params (key/value pairs)
+  params = {"env": "prod", "batch_size": "100"}
+
+  create_job = DatabricksCreateJobsOperator(
+      task_id="create_job",
+      json={"name": "my-job", "tasks": [...]},
+      params=params,
+  )
+
+  # The Databricks job created/reset by the operator will have:
+  #   parameters=[
+  #       {"name": "env",        "default": "prod"},
+  #       {"name": "batch_size", "default": "100"},
+  #   ]
+  # i.e. each "<key>: <value>" in params becomes one
+  # {"name": "<key>", "default": "<value>"} entry in the job definition.
+
+
 Examples
 --------
 
diff --git a/providers/databricks/docs/operators/run_now.rst 
b/providers/databricks/docs/operators/run_now.rst
index 1b51124e5f4..f39d872a0c1 100644
--- a/providers/databricks/docs/operators/run_now.rst
+++ b/providers/databricks/docs/operators/run_now.rst
@@ -49,6 +49,31 @@ All other parameters are optional and described in 
documentation for ``Databrick
 * ``repair_run``
 * ``cancel_previous_runs``
 
+Forwarding Airflow Dag params as Databricks job parameters
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The Databricks ``api/2.2/jobs/run-now`` endpoint accepts a top-level 
`job_parameters
+<https://docs.databricks.com/api/workspace/jobs/runnow#job_parameters>`_ field 
— a plain
+``Dict[str, str]`` mapping parameter name to value — that overrides the job's 
defaults
+for this run.
+
+If ``job_parameters`` is not set in ``json`` and the operator's ``params`` 
dict is
+non-empty, ``params`` is forwarded as ``job_parameters`` as-is, so Airflow Dag 
params can
+be passed dynamically to a run without hardcoding them in ``json``. If 
``json`` already
+contains ``job_parameters``, it is left untouched.
+
+.. code-block:: python
+
+  run_now = DatabricksRunNowOperator(
+      task_id="run_now",
+      job_id=123,
+      params={"env": "staging", "batch_size": "42"},
+  )
+  # The triggered run receives:
+  #   job_parameters={"env": "staging", "batch_size": "42"}
+  # i.e. the same dict, passed straight through to the run-now request body.
+
+
 DatabricksRunNowDeferrableOperator
 ==================================
 
diff --git a/providers/databricks/docs/operators/submit_run.rst 
b/providers/databricks/docs/operators/submit_run.rst
index 522a051d2e7..f4f78d2fa33 100644
--- a/providers/databricks/docs/operators/submit_run.rst
+++ b/providers/databricks/docs/operators/submit_run.rst
@@ -104,6 +104,38 @@ Another way to do is use the param tasks to pass array of 
objects to instantiate
   notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", 
tasks=tasks)
 
 
+Forwarding Airflow Dag params as task parameters
+------------------------------------------------
+
+Unlike ``api/2.2/jobs/create`` and ``api/2.2/jobs/run-now``, the
+``api/2.2/jobs/runs/submit`` endpoint has no top-level parameter slot — each 
task in
+``tasks`` carries its own parameters whose shape depends on the task type.
+
+If the operator's ``params`` dict is non-empty, it is forwarded as-is into the
+dict-shaped parameter slot of every task in ``json`` whose corresponding field 
is empty:
+
+* ``notebook_task.base_parameters`` (e.g. for ``notebook_task``)
+* ``python_wheel_task.named_parameters``
+* ``sql_task.parameters``
+* ``run_job_task.job_parameters``
+
+Tasks whose only parameter slot is ``List[str]`` (``spark_jar_task``, 
``spark_python_task``,
+``spark_submit_task``) are skipped because there is no canonical mapping from 
a key/value
+dict to a positional argument list — pass those parameters explicitly via the 
``json``
+or ``tasks`` argument.
+
+.. code-block:: python
+
+  notebook_run = DatabricksSubmitRunOperator(
+      task_id="notebook_run",
+      notebook_task={"notebook_path": 
"/Users/[email protected]/PrepareData"},
+      new_cluster={"spark_version": "15.4.x-scala2.12", "num_workers": 2},
+      params={"env": "dev", "shard": "1"},
+  )
+  # The submitted run's notebook_task.base_parameters becomes:
+  #   {"env": "dev", "shard": "1"}
+  # i.e. the same dict, copied into the task's dict-shaped parameter slot.
+
 
 Examples
 --------
diff --git 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
index 4c581215be4..c9e7c6891fb 100644
--- 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
+++ 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
@@ -260,6 +260,27 @@ def 
_handle_deferrable_databricks_operator_completion(event: dict, log: Logger)
     raise AirflowException(error_message)
 
 
+# Mapping of task definition keys (in the runs/submit JSON) to the dict-shaped
+# parameter sub-key into which Airflow ``self.params`` will be auto-injected.
+# Tasks whose only parameter field is ``List[str]`` (e.g. spark_python_task,
+# spark_jar_task, spark_submit_task) are intentionally omitted because there is
+# no canonical way to convert a key/value dict to positional/CLI arguments.
+_DICT_PARAM_FIELD_BY_TASK = {
+    "notebook_task": "base_parameters",
+    "python_wheel_task": "named_parameters",
+    "sql_task": "parameters",
+    "run_job_task": "job_parameters",
+}
+
+
+def _inject_airflow_params_into_task(task: dict, params: dict) -> None:
+    """Set dict-shaped per-task parameter fields from ``params`` if they are 
not already set."""
+    for task_key, field in _DICT_PARAM_FIELD_BY_TASK.items():
+        task_def = task.get(task_key)
+        if isinstance(task_def, dict) and not task_def.get(field):
+            task_def[field] = dict(params)
+
+
 class DatabricksJobRunLink(BaseOperatorLink):
     """Constructs a link to monitor a Databricks Job Run."""
 
@@ -323,6 +344,12 @@ class DatabricksCreateJobsOperator(BaseOperator):
             might be a floating point number).
     :param databricks_retry_args: An optional dictionary with arguments passed 
to ``tenacity.Retrying`` class.
 
+    .. note::
+        If ``parameters`` is not set in ``json`` and the operator's ``params`` 
dict is non-empty,
+        the operator's ``params`` are automatically converted to job-level 
``parameters`` (a list
+        of ``{"name": k, "default": v}`` entries) so that Airflow Dag params 
can be forwarded as
+        Databricks job parameters without hardcoding them in ``json``.
+
     """
 
     # Used in airflow.models.BaseOperator
@@ -406,6 +433,8 @@ class DatabricksCreateJobsOperator(BaseOperator):
         if "name" not in self.json:
             raise AirflowException("Missing required parameter: name")
         job_id = self._hook.find_job_id_by_name(self.json["name"])
+        if not self.json.get("parameters") and self.params:
+            self.json["parameters"] = [{"name": k, "default": v} for k, v in 
dict(self.params).items()]
         if job_id is None:
             return self._hook.create_job(self.json)
         self._hook.reset_job(str(job_id), self.json)
@@ -531,6 +560,15 @@ class DatabricksSubmitRunOperator(BaseOperator):
 
         .. seealso::
             
https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit
+
+    .. note::
+        If the operator's ``params`` dict is non-empty, it is automatically 
forwarded into the
+        dict-shaped parameter slot of every task in ``json`` whose 
corresponding field is empty:
+        ``notebook_task.base_parameters``, 
``python_wheel_task.named_parameters``,
+        ``sql_task.parameters``, ``run_job_task.job_parameters``. Tasks whose 
only parameter
+        field is ``List[str]`` (``spark_jar_task``, ``spark_python_task``, 
``spark_submit_task``)
+        are skipped because there is no canonical mapping from a key/value 
dict to a positional
+        argument list.
     """
 
     # Used in airflow.models.BaseOperator
@@ -645,6 +683,17 @@ class DatabricksSubmitRunOperator(BaseOperator):
             pipeline_name = self.json["pipeline_task"]["pipeline_name"]
             self.json["pipeline_task"]["pipeline_id"] = 
self._hook.find_pipeline_id_by_name(pipeline_name)
             del self.json["pipeline_task"]["pipeline_name"]
+
+        if self.params:
+            params_dump = dict(self.params)
+            tasks = self.json.get("tasks")
+            if isinstance(tasks, list):
+                for task in tasks:
+                    if isinstance(task, dict):
+                        _inject_airflow_params_into_task(task, params_dump)
+            else:
+                _inject_airflow_params_into_task(self.json, params_dump)
+
         json_normalised = normalise_json_content(self.json)
         self.run_id = self._hook.submit_run(json_normalised)
         if self.deferrable:
@@ -844,6 +893,12 @@ class DatabricksRunNowOperator(BaseOperator):
             (https://docs.databricks.com/api/workspace/jobs/update). If 
nothing is matched, then repair
             will not get triggered.
     :param cancel_previous_runs: Cancel all existing running jobs before 
submitting new one.
+
+    .. note::
+        If ``job_parameters`` is not set in ``json`` and the operator's 
``params`` dict is
+        non-empty, the operator's ``params`` are automatically forwarded as 
``job_parameters``
+        so that Airflow Dag params can be passed dynamically to Databricks 
runs without
+        hardcoding them in ``json``.
     """
 
     # Used in airflow.models.BaseOperator
@@ -953,6 +1008,9 @@ class DatabricksRunNowOperator(BaseOperator):
 
             hook.cancel_all_runs(job_id)
 
+        if not self.json.get("job_parameters") and self.params:
+            self.json["job_parameters"] = dict(self.params)
+
         self.run_id = hook.run_now(self.json)
         if self.deferrable:
             _handle_deferrable_databricks_operator_execution(self, hook, 
self.log, context)
diff --git 
a/providers/databricks/tests/unit/databricks/operators/test_databricks.py 
b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
index 84c625cddb7..986cf51f1cf 100644
--- a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
+++ b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
@@ -277,6 +277,7 @@ ACCESS_CONTROL_LIST = [
         "permission_level": "CAN_MANAGE",
     }
 ]
+JOB_PARAMS = [{"name": "param1", "default": "value1"}]
 
 
 def mock_dict(d: dict):
@@ -602,6 +603,67 @@ class TestDatabricksCreateJobsOperator:
 
         db_mock.update_job_permission.assert_not_called()
 
+    @pytest.mark.parametrize(
+        ("found_job_id", "hook_method"),
+        [
+            pytest.param(None, "create_job", id="create-path"),
+            pytest.param(JOB_ID, "reset_job", id="reset-path"),
+        ],
+    )
+    
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+    def test_injects_airflow_params_when_parameters_missing(self, 
db_mock_class, found_job_id, hook_method):
+        """
+        When ``parameters`` is not set in ``json`` and the operator's 
``params`` dict is
+        non-empty, the operator's ``params`` should be forwarded as job-level
+        ``parameters`` on both the create and reset paths (regression test for
+        GH-39002).
+        """
+        op = DatabricksCreateJobsOperator(
+            task_id=TASK_ID,
+            json={"name": JOB_NAME, "tasks": TASKS},
+            params={"env": "prod", "batch_size": 100},
+        )
+        db_mock = db_mock_class.return_value
+        db_mock.find_job_id_by_name.return_value = found_job_id
+
+        op.execute({})
+
+        # The create-path passes the settings dict directly; the reset-path 
passes
+        # (job_id, settings) — pull the settings out in either case.
+        call_args = getattr(db_mock, hook_method).call_args.args
+        settings = call_args[0] if hook_method == "create_job" else 
call_args[1]
+        assert settings["parameters"] == [
+            {"name": "env", "default": "prod"},
+            {"name": "batch_size", "default": 100},
+        ]
+
+    @pytest.mark.parametrize(
+        ("found_job_id", "hook_method"),
+        [
+            pytest.param(None, "create_job", id="create-path"),
+            pytest.param(JOB_ID, "reset_job", id="reset-path"),
+        ],
+    )
+    
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+    def test_does_not_override_existing_parameters(self, db_mock_class, 
found_job_id, hook_method):
+        """
+        When ``parameters`` is already set in ``json``, the operator's 
``params`` must
+        not override it on either the create or reset paths.
+        """
+        op = DatabricksCreateJobsOperator(
+            task_id=TASK_ID,
+            json={"name": JOB_NAME, "tasks": TASKS, "parameters": JOB_PARAMS},
+            params={"env": "prod"},
+        )
+        db_mock = db_mock_class.return_value
+        db_mock.find_job_id_by_name.return_value = found_job_id
+
+        op.execute({})
+
+        call_args = getattr(db_mock, hook_method).call_args.args
+        settings = call_args[0] if hook_method == "create_job" else 
call_args[1]
+        assert settings["parameters"] == JOB_PARAMS
+
 
 class TestDatabricksSubmitRunOperator:
     def test_init_with_notebook_task_named_parameters(self):
@@ -1183,6 +1245,76 @@ class TestDatabricksSubmitRunOperator:
         assert op.run_id == RUN_ID
         assert not mock_defer.called
 
+    
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+    def test_submit_run_injects_airflow_params_into_notebook_task(self, 
db_mock_class):
+        """
+        For a single notebook_task, ``self.params`` should be injected into
+        ``notebook_task.base_parameters`` (regression test for GH-39002).
+        """
+        op = DatabricksSubmitRunOperator(
+            task_id=TASK_ID,
+            notebook_task={"notebook_path": "/Users/me/notebook"},
+            new_cluster=NEW_CLUSTER,
+            params={"env": "prod", "batch_size": "100"},
+        )
+        db_mock = db_mock_class.return_value
+        db_mock.submit_run.return_value = RUN_ID
+        db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+        op.execute(None)
+
+        actual = db_mock.submit_run.call_args.args[0]
+        assert actual["notebook_task"]["base_parameters"] == {"env": "prod", 
"batch_size": "100"}
+
+    
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+    def 
test_submit_run_injects_airflow_params_into_each_task_in_tasks_list(self, 
db_mock_class):
+        """
+        For multiple ``tasks``, dict-shaped per-task params should be filled 
in for
+        each task that supports them.
+        """
+        op = DatabricksSubmitRunOperator(
+            task_id=TASK_ID,
+            tasks=[
+                {"task_key": "t1", "notebook_task": {"notebook_path": "/n1"}},
+                {"task_key": "t2", "spark_jar_task": {"main_class_name": 
"Foo"}},
+            ],
+            params={"env": "prod"},
+        )
+        db_mock = db_mock_class.return_value
+        db_mock.submit_run.return_value = RUN_ID
+        db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+        op.execute(None)
+
+        actual = db_mock.submit_run.call_args.args[0]
+        assert actual["tasks"][0]["notebook_task"]["base_parameters"] == 
{"env": "prod"}
+        # spark_jar_task only accepts List[str] parameters; skip 
auto-injection.
+        assert "parameters" not in actual["tasks"][1]["spark_jar_task"]
+
+    
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+    def test_submit_run_does_not_override_existing_task_parameters(self, 
db_mock_class):
+        """
+        If a dict-shaped per-task parameter field is already populated, 
``self.params``
+        should not override it.
+        """
+        op = DatabricksSubmitRunOperator(
+            task_id=TASK_ID,
+            notebook_task={
+                "notebook_path": "/Users/me/notebook",
+                "base_parameters": {"explicit": "value"},
+            },
+            new_cluster=NEW_CLUSTER,
+            params={"env": "prod"},
+        )
+        db_mock = db_mock_class.return_value
+        db_mock.submit_run.return_value = RUN_ID
+        db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+        op.execute(None)
+
+        actual = db_mock.submit_run.call_args.args[0]
+        assert actual["notebook_task"]["base_parameters"] == {"explicit": 
"value"}
+
 
 class TestDatabricksRunNowOperator:
     def test_init_with_named_parameters(self):
@@ -1999,6 +2131,48 @@ class TestDatabricksRunNowOperator:
         assert op.run_id == RUN_ID
         assert not mock_defer.called
 
+    
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+    def test_run_now_injects_airflow_params_when_job_parameters_missing(self, 
db_mock_class):
+        """
+        When ``job_parameters`` is not set in ``json`` and the operator's 
``params`` dict is
+        non-empty, the operator's ``params`` should be forwarded as 
``job_parameters``
+        (regression test for GH-39002).
+        """
+        op = DatabricksRunNowOperator(
+            task_id=TASK_ID,
+            job_id=JOB_ID,
+            params={"env": "prod", "batch_size": 100},
+        )
+        db_mock = db_mock_class.return_value
+        db_mock.run_now.return_value = RUN_ID
+        db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+        op.execute(None)
+
+        actual = db_mock.run_now.call_args.args[0]
+        assert actual["job_parameters"] == {"env": "prod", "batch_size": 100}
+
+    
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+    def test_run_now_does_not_override_existing_job_parameters(self, 
db_mock_class):
+        """
+        When ``job_parameters`` is already set in ``json``, the operator's 
``params`` should
+        not override it.
+        """
+        op = DatabricksRunNowOperator(
+            task_id=TASK_ID,
+            job_id=JOB_ID,
+            json={"job_parameters": {"explicit": "value"}},
+            params={"env": "prod"},
+        )
+        db_mock = db_mock_class.return_value
+        db_mock.run_now.return_value = RUN_ID
+        db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+        op.execute(None)
+
+        actual = db_mock.run_now.call_args.args[0]
+        assert actual["job_parameters"] == {"explicit": "value"}
+
 
 class TestDatabricksSQLStatementsOperator:
     def test_init(self):

Reply via email to