This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9eea9a02193 Add OpenLineage Spark conf injection to
DatabricksSubmitRunOperator (#67894)
9eea9a02193 is described below
commit 9eea9a02193173259f105f057de506e90d1fa272
Author: Rahul Madan <[email protected]>
AuthorDate: Tue Jun 23 14:14:17 2026 +0530
Add OpenLineage Spark conf injection to DatabricksSubmitRunOperator (#67894)
* Add OpenLineage parent/transport info injection to
DatabricksSubmitRunOperator
DatabricksSubmitRunOperator did not emit any OpenLineage information. This
adds
optional injection of OpenLineage parent job and transport configuration
into the
job's ``new_cluster.spark_conf`` (single-task and multi-task forms), so the
Spark
job running on Databricks can correlate its lineage events with the Airflow
task
and send them to the same backend.
The behaviour is controlled by two new operator parameters,
``openlineage_inject_parent_job_info`` and
``openlineage_inject_transport_info``,
each defaulting to the corresponding ``openlineage.spark_inject_*_info``
config
option, mirroring the existing Dataproc, EMR and Glue operators. Injection
is
skipped when the provider is unavailable, when the relevant properties are
already
present, or when the job has no ``new_cluster`` to modify (e.g. an existing
cluster).
Signed-off-by: rahul-madaan <[email protected]>
* Use debug log level when skipping Databricks OpenLineage injection
without a new_cluster
Signed-off-by: rahul-madaan <[email protected]>
* Inject OpenLineage properties into Databricks for_each_task nested
clusters
Signed-off-by: rahul-madaan <[email protected]>
---------
Signed-off-by: rahul-madaan <[email protected]>
---
.../providers/databricks/operators/databricks.py | 39 +++++
.../providers/databricks/utils/openlineage.py | 139 ++++++++++++++++
.../unit/databricks/operators/test_databricks.py | 133 +++++++++++++++
.../unit/databricks/utils/test_openlineage.py | 178 +++++++++++++++++++++
providers/openlineage/docs/spark.rst | 1 +
5 files changed, 490 insertions(+)
diff --git
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
index bf743aaf516..4bae6b28f0a 100644
---
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
+++
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
@@ -652,6 +652,13 @@ class DatabricksSubmitRunOperator(BaseOperator):
.. seealso::
https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit
+ :param openlineage_inject_parent_job_info: If True, injects OpenLineage
parent job information
+ into the ``new_cluster`` ``spark_conf`` so the Spark job emits a
``parentRunFacet`` linking
+ back to the Airflow task. Defaults to the
+ ``openlineage.spark_inject_parent_job_info`` config value.
+ :param openlineage_inject_transport_info: If True, injects OpenLineage
transport configuration
+ into the ``new_cluster`` ``spark_conf`` so the Spark job sends OL
events to the same backend
+ as Airflow. Defaults to the
``openlineage.spark_inject_transport_info`` config value.
.. note::
If the operator's ``params`` dict is non-empty, it is automatically
forwarded into the
@@ -716,6 +723,12 @@ class DatabricksSubmitRunOperator(BaseOperator):
wait_for_termination: bool = True,
git_source: dict[str, str] | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ openlineage_inject_parent_job_info: bool = conf.getboolean(
+ "openlineage", "spark_inject_parent_job_info", fallback=False
+ ),
+ openlineage_inject_transport_info: bool = conf.getboolean(
+ "openlineage", "spark_inject_transport_info", fallback=False
+ ),
**kwargs,
) -> None:
"""Create a new ``DatabricksSubmitRunOperator``."""
@@ -743,6 +756,8 @@ class DatabricksSubmitRunOperator(BaseOperator):
self.databricks_retry_args = databricks_retry_args
self.wait_for_termination = wait_for_termination
self.deferrable = deferrable
+ self.openlineage_inject_parent_job_info =
openlineage_inject_parent_job_info
+ self.openlineage_inject_transport_info =
openlineage_inject_transport_info
# This variable will be used in case our task gets killed.
self.run_id: int | None = None
@@ -830,6 +845,10 @@ class DatabricksSubmitRunOperator(BaseOperator):
else:
_inject_airflow_params_into_task(json, params_dump)
+ if self.openlineage_inject_parent_job_info or
self.openlineage_inject_transport_info:
+ self.log.info("Automatic injection of OpenLineage information into
Spark properties is enabled.")
+ json =
self._inject_openlineage_properties_into_databricks_job(json, context)
+
normalised = cast("dict[str, Any]", normalise_json_content(json))
self.run_id = self._hook.submit_run(normalised)
if self.deferrable:
@@ -837,6 +856,26 @@ class DatabricksSubmitRunOperator(BaseOperator):
else:
_handle_databricks_operator_execution(self, self._hook, self.log,
context)
+ def _inject_openlineage_properties_into_databricks_job(self, json: dict,
context: Context) -> dict:
+ try:
+ from airflow.providers.databricks.utils.openlineage import (
+ inject_openlineage_properties_into_databricks_job,
+ )
+
+ return inject_openlineage_properties_into_databricks_job(
+ job=json,
+ context=context,
+ inject_parent_job_info=self.openlineage_inject_parent_job_info,
+ inject_transport_info=self.openlineage_inject_transport_info,
+ )
+ except Exception as e:
+ self.log.warning(
+ "An error occurred while trying to inject OpenLineage
information. "
+ "Databricks job has not been modified by OpenLineage.",
+ exc_info=e,
+ )
+ return json
+
def on_kill(self):
if self.run_id:
self._hook.cancel_run(self.run_id)
diff --git
a/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
b/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
index 58e87a21dd7..706afa5df09 100644
--- a/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
+++ b/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import copy
import datetime
import json
import logging
@@ -24,12 +25,17 @@ from typing import TYPE_CHECKING, Any
import requests
from airflow.providers.common.compat.openlineage.check import
require_openlineage_version
+from airflow.providers.common.compat.openlineage.utils.spark import (
+ inject_parent_job_information_into_spark_properties,
+ inject_transport_information_into_spark_properties,
+)
from airflow.providers.common.compat.sdk import timezone
if TYPE_CHECKING:
from openlineage.client.event_v2 import RunEvent
from openlineage.client.facet_v2 import JobFacet
+ from airflow.providers.common.compat.sdk import Context
from airflow.providers.databricks.hooks.databricks import DatabricksHook
from airflow.providers.databricks.hooks.databricks_sql import
DatabricksSqlHook
@@ -320,3 +326,136 @@ def emit_openlineage_events_for_databricks_queries(
log.info("OpenLineage has successfully finished processing information
about Databricks queries.")
return
+
+
+def _is_openlineage_provider_accessible() -> bool:
+ """
+ Check if the OpenLineage provider is accessible.
+
+ This function attempts to import the necessary OpenLineage modules and
checks if the provider
+ is enabled and the listener is available.
+
+ Returns:
+ True if the OpenLineage provider is accessible, False otherwise.
+ """
+ try:
+ from airflow.providers.openlineage.conf import is_disabled
+ from airflow.providers.openlineage.plugins.listener import
get_openlineage_listener
+ except ImportError:
+ log.debug("OpenLineage provider could not be imported.")
+ return False
+
+ if is_disabled():
+ log.debug("OpenLineage provider is disabled.")
+ return False
+
+ if not get_openlineage_listener():
+ log.debug("OpenLineage listener could not be found.")
+ return False
+
+ return True
+
+
+def _extract_new_clusters_from_databricks_job(job: dict) -> list[dict]:
+ """
+ Collect every ``new_cluster`` definition that can carry Spark properties
in a Databricks job.
+
+ A ``runs/submit`` payload can define a ``new_cluster`` at the top level
(single-task form), inline
+ on each task (``tasks[].new_cluster``), inside a ``for_each_task``'s
nested task
+ (``tasks[].for_each_task.task.new_cluster``), or as a shared job cluster
+ (``job_clusters[].new_cluster``) referenced by tasks through
``job_cluster_key``. Tasks running on
+ an ``existing_cluster_id`` have no ``new_cluster`` to mutate and are
skipped.
+
+ Args:
+ job: The Databricks ``runs/submit`` job definition.
+
+ Returns:
+ The list of ``new_cluster`` dicts found in the job definition.
+ """
+ new_clusters = []
+ if isinstance(job.get("new_cluster"), dict):
+ new_clusters.append(job["new_cluster"])
+ for key in ("tasks", "job_clusters"):
+ if isinstance(job.get(key), list):
+ new_clusters.extend(
+ item["new_cluster"] for item in job[key] if
isinstance(item.get("new_cluster"), dict)
+ )
+ if isinstance(job.get("tasks"), list):
+ for task in job["tasks"]:
+ # ``for_each_task`` wraps a nested task that can carry its own
``new_cluster``.
+ for_each = task.get("for_each_task") if isinstance(task, dict)
else None
+ nested = for_each.get("task") if isinstance(for_each, dict) else
None
+ if isinstance(nested, dict) and
isinstance(nested.get("new_cluster"), dict):
+ new_clusters.append(nested["new_cluster"])
+ return new_clusters
+
+
+def inject_openlineage_properties_into_databricks_job(
+ job: dict, context: Context, inject_parent_job_info: bool,
inject_transport_info: bool
+) -> dict:
+ """
+ Inject OpenLineage properties into a Databricks job definition.
+
+ This function does not remove existing configurations or modify the job
definition in any way,
+ except to add the required OpenLineage properties if they are not already
present.
+
+ The entire properties injection process will be skipped if any condition
is met:
+ - The OpenLineage provider is not accessible.
+ - The job has no ``new_cluster`` definition to inject Spark properties
into (e.g. it only uses
+ an ``existing_cluster_id``, whose Spark configuration is fixed at
cluster creation time).
+ - Both `inject_parent_job_info` and `inject_transport_info` are set to
False.
+
+ Additionally, specific information will not be injected if relevant
OpenLineage properties already
+ exist.
+
+ Parent job information will not be injected if:
+ - Any property prefixed with `spark.openlineage.parent` exists.
+ - `inject_parent_job_info` is False.
+ Transport information will not be injected if:
+ - Any property prefixed with `spark.openlineage.transport` exists.
+ - `inject_transport_info` is False.
+
+ Args:
+ job: The original Databricks ``runs/submit`` job definition.
+ context: The Airflow context in which the job is running.
+ inject_parent_job_info: Flag indicating whether to inject parent job
information.
+ inject_transport_info: Flag indicating whether to inject transport
information.
+
+ Returns:
+ The modified job definition with OpenLineage properties injected, if
applicable.
+ """
+ if not inject_parent_job_info and not inject_transport_info:
+ log.debug("Automatic injection of OpenLineage information is
disabled.")
+ return job
+
+ if not _is_openlineage_provider_accessible():
+ log.warning(
+ "Could not access OpenLineage provider for automatic OpenLineage "
+ "properties injection. No action will be performed."
+ )
+ return job
+
+ job = copy.deepcopy(job)
+ new_clusters = _extract_new_clusters_from_databricks_job(job)
+ if not new_clusters:
+ log.debug(
+ "Could not find a Databricks `new_cluster` definition for
automatic OpenLineage "
+ "properties injection. No action will be performed."
+ )
+ return job
+
+ for new_cluster in new_clusters:
+ properties = new_cluster.get("spark_conf", {})
+ if inject_parent_job_info:
+ log.debug("Injecting OpenLineage parent job information into Spark
properties.")
+ properties = inject_parent_job_information_into_spark_properties(
+ properties=properties, context=context
+ )
+ if inject_transport_info:
+ log.debug("Injecting OpenLineage transport information into Spark
properties.")
+ properties = inject_transport_information_into_spark_properties(
+ properties=properties, context=context
+ )
+ new_cluster["spark_conf"] = properties
+
+ return job
diff --git
a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
index 3d613712caa..f44f1e9fbe0 100644
--- a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
+++ b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
@@ -1518,6 +1518,139 @@ class TestDatabricksSubmitRunOperator:
db_mock_class.assert_not_called()
+class TestDatabricksSubmitRunOperatorOpenLineageInjection:
+ """Tests for OpenLineage parent job info and transport info injection in
DatabricksSubmitRunOperator."""
+
+ @mock.patch(
+
"airflow.providers.databricks.utils.openlineage.inject_openlineage_properties_into_databricks_job"
+ )
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ def test_inject_parent_job_info_called_when_enabled(self, db_mock_class,
mock_inject):
+ mock_inject.side_effect = lambda job, context, inject_parent_job_info,
inject_transport_info: {
+ **job,
+ "new_cluster": {
+ **job["new_cluster"],
+ "spark_conf": {"spark.openlineage.parentJobNamespace": "ns"},
+ },
+ }
+ op = DatabricksSubmitRunOperator(
+ task_id=TASK_ID,
+ new_cluster=NEW_CLUSTER,
+ notebook_task=NOTEBOOK_TASK,
+ openlineage_inject_parent_job_info=True,
+ )
+ db_mock = db_mock_class.return_value
+ db_mock.submit_run.return_value = RUN_ID
+ db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+ op.execute(None)
+
+ mock_inject.assert_called_once()
+ submitted = db_mock.submit_run.call_args.args[0]
+ assert
submitted["new_cluster"]["spark_conf"]["spark.openlineage.parentJobNamespace"]
== "ns"
+
+ @mock.patch(
+
"airflow.providers.databricks.utils.openlineage.inject_openlineage_properties_into_databricks_job"
+ )
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ def test_inject_parent_job_info_not_called_when_disabled(self,
db_mock_class, mock_inject):
+ op = DatabricksSubmitRunOperator(
+ task_id=TASK_ID,
+ new_cluster=NEW_CLUSTER,
+ notebook_task=NOTEBOOK_TASK,
+ openlineage_inject_parent_job_info=False,
+ )
+ db_mock = db_mock_class.return_value
+ db_mock.submit_run.return_value = RUN_ID
+ db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+ op.execute(None)
+
+ mock_inject.assert_not_called()
+
+ @mock.patch(
+
"airflow.providers.databricks.utils.openlineage.inject_openlineage_properties_into_databricks_job"
+ )
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ def test_inject_transport_info_called_when_enabled(self, db_mock_class,
mock_inject):
+ mock_inject.side_effect = lambda job, context, inject_parent_job_info,
inject_transport_info: {
+ **job,
+ "new_cluster": {**job["new_cluster"], "spark_conf":
{"spark.openlineage.transport.type": "http"}},
+ }
+ op = DatabricksSubmitRunOperator(
+ task_id=TASK_ID,
+ new_cluster=NEW_CLUSTER,
+ notebook_task=NOTEBOOK_TASK,
+ openlineage_inject_transport_info=True,
+ )
+ db_mock = db_mock_class.return_value
+ db_mock.submit_run.return_value = RUN_ID
+ db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+ op.execute(None)
+
+ mock_inject.assert_called_once()
+ submitted = db_mock.submit_run.call_args.args[0]
+ assert
submitted["new_cluster"]["spark_conf"]["spark.openlineage.transport.type"] ==
"http"
+
+ @mock.patch(
+
"airflow.providers.databricks.utils.openlineage.inject_openlineage_properties_into_databricks_job"
+ )
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ def test_inject_both_parent_and_transport_info(self, db_mock_class,
mock_inject):
+ mock_inject.side_effect = lambda job, context, inject_parent_job_info,
inject_transport_info: job
+ op = DatabricksSubmitRunOperator(
+ task_id=TASK_ID,
+ new_cluster=NEW_CLUSTER,
+ notebook_task=NOTEBOOK_TASK,
+ openlineage_inject_parent_job_info=True,
+ openlineage_inject_transport_info=True,
+ )
+ db_mock = db_mock_class.return_value
+ db_mock.submit_run.return_value = RUN_ID
+ db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+ op.execute(None)
+
+ mock_inject.assert_called_once()
+ _, call_kwargs = mock_inject.call_args
+ assert call_kwargs["inject_parent_job_info"] is True
+ assert call_kwargs["inject_transport_info"] is True
+
+ @mock.patch(
+
"airflow.providers.databricks.utils.openlineage.inject_openlineage_properties_into_databricks_job"
+ )
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ def test_inject_parent_job_info_preserves_existing_config(self,
db_mock_class, mock_inject):
+ """Existing ``spark_conf`` entries are preserved alongside the
injected OpenLineage properties."""
+ new_cluster = {**NEW_CLUSTER, "spark_conf": {"spark.executor.memory":
"8g"}}
+ mock_inject.side_effect = lambda job, context, inject_parent_job_info,
inject_transport_info: {
+ **job,
+ "new_cluster": {
+ **job["new_cluster"],
+ "spark_conf": {
+ **job["new_cluster"]["spark_conf"],
+ "spark.openlineage.parentJobNamespace": "ns",
+ },
+ },
+ }
+ op = DatabricksSubmitRunOperator(
+ task_id=TASK_ID,
+ new_cluster=new_cluster,
+ notebook_task=NOTEBOOK_TASK,
+ openlineage_inject_parent_job_info=True,
+ )
+ db_mock = db_mock_class.return_value
+ db_mock.submit_run.return_value = RUN_ID
+ db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+ op.execute(None)
+
+ submitted = db_mock.submit_run.call_args.args[0]
+ assert submitted["new_cluster"]["spark_conf"]["spark.executor.memory"]
== "8g"
+ assert
submitted["new_cluster"]["spark_conf"]["spark.openlineage.parentJobNamespace"]
== "ns"
+
+
class TestDatabricksRunNowOperator:
def test_init_with_named_parameters(self):
"""
diff --git
a/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
b/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
index 8702456ca86..cefa881c7ec 100644
--- a/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
+++ b/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
@@ -34,11 +34,13 @@ from airflow.providers.databricks.hooks.databricks import
DatabricksHook
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
from airflow.providers.databricks.utils.openlineage import (
_create_ol_event_pair,
+ _extract_new_clusters_from_databricks_job,
_get_parent_run_facet,
_get_queries_details_from_databricks,
_process_data_from_api,
_run_api_call,
emit_openlineage_events_for_databricks_queries,
+ inject_openlineage_properties_into_databricks_job,
)
from airflow.providers.openlineage.conf import namespace
from airflow.utils.state import TaskInstanceState
@@ -1196,3 +1198,179 @@ def
test_emit_openlineage_events_with_old_openlineage_provider(mock_version):
)
assert query_ids == original_query_ids # Verify that the input
query_ids list is unchanged.
fake_adapter.emit.assert_not_called() # No events should be emitted
+
+
+OL_UTILS = "airflow.providers.databricks.utils.openlineage"
+
+
+def test_extract_new_clusters_from_databricks_job():
+ top_cluster = {"spark_version": "13.3.x-scala2.12"}
+ task_cluster = {"spark_version": "14.3.x-scala2.12"}
+ job_cluster = {"spark_version": "15.4.x-scala2.12"}
+ for_each_cluster = {"spark_version": "16.4.x-scala2.12"}
+ job = {
+ "new_cluster": top_cluster,
+ "tasks": [
+ {"task_key": "a", "new_cluster": task_cluster},
+ {"task_key": "b", "existing_cluster_id": "existing"},
+ {"task_key": "c", "job_cluster_key": "shared"},
+ {
+ "task_key": "d",
+ "for_each_task": {"task": {"task_key": "d/iter",
"new_cluster": for_each_cluster}},
+ },
+ ],
+ "job_clusters": [{"job_cluster_key": "shared", "new_cluster":
job_cluster}],
+ }
+ assert _extract_new_clusters_from_databricks_job(job) == [
+ top_cluster,
+ task_cluster,
+ job_cluster,
+ for_each_cluster,
+ ]
+
+
+def test_extract_new_clusters_from_databricks_job_existing_cluster_only():
+ assert _extract_new_clusters_from_databricks_job({"existing_cluster_id":
"existing"}) == []
+
+
[email protected](f"{OL_UTILS}._is_openlineage_provider_accessible",
return_value=True)
+def
test_inject_openlineage_properties_disabled_returns_job_unchanged(mock_accessible):
+ job = {"new_cluster": {"spark_conf": {}}}
+ result = inject_openlineage_properties_into_databricks_job(
+ job, context=mock.MagicMock(), inject_parent_job_info=False,
inject_transport_info=False
+ )
+ assert result == job
+ mock_accessible.assert_not_called()
+
+
[email protected](f"{OL_UTILS}._is_openlineage_provider_accessible",
return_value=False)
+def
test_inject_openlineage_properties_provider_inaccessible_returns_job_unchanged(mock_accessible):
+ job = {"new_cluster": {"spark_conf": {}}}
+ result = inject_openlineage_properties_into_databricks_job(
+ job, context=mock.MagicMock(), inject_parent_job_info=True,
inject_transport_info=True
+ )
+ assert result == job
+
+
[email protected](f"{OL_UTILS}._is_openlineage_provider_accessible",
return_value=True)
+def
test_inject_openlineage_properties_no_new_cluster_returns_job_unchanged(mock_accessible):
+ job = {"existing_cluster_id": "existing"}
+ result = inject_openlineage_properties_into_databricks_job(
+ job, context=mock.MagicMock(), inject_parent_job_info=True,
inject_transport_info=True
+ )
+ assert result == job
+
+
[email protected](f"{OL_UTILS}.inject_transport_information_into_spark_properties")
[email protected](f"{OL_UTILS}.inject_parent_job_information_into_spark_properties")
[email protected](f"{OL_UTILS}._is_openlineage_provider_accessible",
return_value=True)
+def test_inject_openlineage_properties_into_top_level_new_cluster(
+ mock_accessible, mock_parent, mock_transport
+):
+ mock_parent.side_effect = lambda properties, context: {
+ **properties,
+ "spark.openlineage.parentJobName": "dag_id.task_id",
+ }
+ mock_transport.side_effect = lambda properties, context: {
+ **properties,
+ "spark.openlineage.transport.type": "http",
+ }
+ job = {"new_cluster": {"spark_conf": {"spark.executor.memory": "8g"}},
"notebook_task": {"x": "y"}}
+
+ result = inject_openlineage_properties_into_databricks_job(
+ job, context=mock.MagicMock(), inject_parent_job_info=True,
inject_transport_info=True
+ )
+
+ spark_conf = result["new_cluster"]["spark_conf"]
+ assert spark_conf["spark.executor.memory"] == "8g"
+ assert spark_conf["spark.openlineage.parentJobName"] == "dag_id.task_id"
+ assert spark_conf["spark.openlineage.transport.type"] == "http"
+
+
[email protected](f"{OL_UTILS}.inject_parent_job_information_into_spark_properties")
[email protected](f"{OL_UTILS}._is_openlineage_provider_accessible",
return_value=True)
+def
test_inject_openlineage_properties_into_multi_task_new_clusters(mock_accessible,
mock_parent):
+ mock_parent.side_effect = lambda properties, context: {
+ **properties,
+ "spark.openlineage.parentJobName": "dag_id.task_id",
+ }
+ job = {
+ "tasks": [
+ {"task_key": "a", "new_cluster": {"spark_conf": {}}},
+ {"task_key": "b", "new_cluster": {"spark_conf": {}}},
+ {"task_key": "c", "existing_cluster_id": "existing"},
+ ]
+ }
+
+ result = inject_openlineage_properties_into_databricks_job(
+ job, context=mock.MagicMock(), inject_parent_job_info=True,
inject_transport_info=False
+ )
+
+ assert
result["tasks"][0]["new_cluster"]["spark_conf"]["spark.openlineage.parentJobName"]
== (
+ "dag_id.task_id"
+ )
+ assert
result["tasks"][1]["new_cluster"]["spark_conf"]["spark.openlineage.parentJobName"]
== (
+ "dag_id.task_id"
+ )
+ assert result["tasks"][2] == {"task_key": "c", "existing_cluster_id":
"existing"}
+
+
[email protected](f"{OL_UTILS}.inject_parent_job_information_into_spark_properties")
[email protected](f"{OL_UTILS}._is_openlineage_provider_accessible",
return_value=True)
+def
test_inject_openlineage_properties_into_shared_job_clusters(mock_accessible,
mock_parent):
+ mock_parent.side_effect = lambda properties, context: {
+ **properties,
+ "spark.openlineage.parentJobName": "dag_id.task_id",
+ }
+ job = {
+ "tasks": [{"task_key": "a", "job_cluster_key": "shared"}],
+ "job_clusters": [{"job_cluster_key": "shared", "new_cluster":
{"spark_conf": {}}}],
+ }
+
+ result = inject_openlineage_properties_into_databricks_job(
+ job, context=mock.MagicMock(), inject_parent_job_info=True,
inject_transport_info=False
+ )
+
+ injected =
result["job_clusters"][0]["new_cluster"]["spark_conf"]["spark.openlineage.parentJobName"]
+ assert injected == "dag_id.task_id"
+
+
[email protected](f"{OL_UTILS}.inject_parent_job_information_into_spark_properties")
[email protected](f"{OL_UTILS}._is_openlineage_provider_accessible",
return_value=True)
+def
test_inject_openlineage_properties_into_for_each_task_new_cluster(mock_accessible,
mock_parent):
+ mock_parent.side_effect = lambda properties, context: {
+ **properties,
+ "spark.openlineage.parentJobName": "dag_id.task_id",
+ }
+ job = {
+ "tasks": [
+ {
+ "task_key": "a",
+ "for_each_task": {"task": {"task_key": "a/iter",
"new_cluster": {"spark_conf": {}}}},
+ }
+ ]
+ }
+
+ result = inject_openlineage_properties_into_databricks_job(
+ job, context=mock.MagicMock(), inject_parent_job_info=True,
inject_transport_info=False
+ )
+
+ injected =
result["tasks"][0]["for_each_task"]["task"]["new_cluster"]["spark_conf"]
+ assert injected["spark.openlineage.parentJobName"] == "dag_id.task_id"
+
+
[email protected](f"{OL_UTILS}.inject_parent_job_information_into_spark_properties")
[email protected](f"{OL_UTILS}._is_openlineage_provider_accessible",
return_value=True)
+def test_inject_openlineage_properties_does_not_mutate_input(mock_accessible,
mock_parent):
+ mock_parent.side_effect = lambda properties, context: {
+ **properties,
+ "spark.openlineage.parentJobName": "dag_id.task_id",
+ }
+ job = {"new_cluster": {"spark_conf": {"spark.executor.memory": "8g"}}}
+ original = copy.deepcopy(job)
+
+ inject_openlineage_properties_into_databricks_job(
+ job, context=mock.MagicMock(), inject_parent_job_info=True,
inject_transport_info=False
+ )
+
+ assert job == original
diff --git a/providers/openlineage/docs/spark.rst
b/providers/openlineage/docs/spark.rst
index f4301525fee..0dbdf967c14 100644
--- a/providers/openlineage/docs/spark.rst
+++ b/providers/openlineage/docs/spark.rst
@@ -96,6 +96,7 @@ Automatic injection is supported for the following operators:
- :class:`~airflow.providers.apache.livy.operators.livy.LivyOperator`
-
:class:`~airflow.providers.apache.spark.operators.spark_submit.SparkSubmitOperator`
+-
:class:`~airflow.providers.databricks.operators.databricks.DatabricksSubmitRunOperator`
-
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateBatchOperator`
-
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator`
-
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`