This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c2f83ea7c97 Add OpenLineage parent and transport info injection to
`EmrServerlessStartJobOperator` (#64807)
c2f83ea7c97 is described below
commit c2f83ea7c973aef140b732d17fd720024644ef6e
Author: Rahul Madan <[email protected]>
AuthorDate: Tue Apr 7 13:48:30 2026 +0530
Add OpenLineage parent and transport info injection to
`EmrServerlessStartJobOperator` (#64807)
Signed-off-by: Rahul Madan <[email protected]>
---
.../airflow/providers/amazon/aws/operators/emr.py | 37 +++-
.../amazon/aws/operators/test_emr_serverless.py | 221 +++++++++++++++++++++
.../common/compat/openlineage/utils/spark.py | 26 +++
.../airflow/providers/openlineage/utils/spark.py | 90 +++++++++
.../tests/unit/openlineage/utils/test_spark.py | 145 ++++++++++++++
5 files changed, 517 insertions(+), 2 deletions(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
index 313cc649489..8efc71289e4 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
@@ -58,6 +58,10 @@ from airflow.providers.amazon.aws.utils.waiter import (
)
from airflow.providers.amazon.aws.utils.waiter_with_logging import wait
from airflow.providers.amazon.version_compat import NOTSET, ArgNotSet
+from airflow.providers.common.compat.openlineage.utils.spark import (
+ inject_parent_job_information_into_emr_serverless_properties,
+ inject_transport_information_into_emr_serverless_properties,
+)
from airflow.providers.common.compat.sdk import AirflowException, conf
from airflow.utils.helpers import exactly_one, prune_dict
@@ -1197,6 +1201,14 @@ class
EmrServerlessStartJobOperator(AwsBaseOperator[EmrServerlessHook]):
:param cancel_on_kill: If True, the EMR Serverless job will be cancelled
when the task is killed
while in deferrable mode. This ensures that orphan jobs are not left
running in EMR Serverless
when an Airflow task is cancelled. Defaults to True.
+ :param openlineage_inject_parent_job_info: If True, injects OpenLineage
parent job information
+ into the EMR Serverless ``spark-defaults`` configuration so the Spark
job emits a
+ ``parentRunFacet`` linking back to the Airflow task. Defaults to the
+ ``openlineage.spark_inject_parent_job_info`` config value.
+ :param openlineage_inject_transport_info: If True, injects OpenLineage
transport configuration
+ into the EMR Serverless ``spark-defaults`` configuration so the Spark
job sends OL events
+ to the same backend as Airflow. Defaults to the
+ ``openlineage.spark_inject_transport_info`` config value.
"""
aws_hook_class = EmrServerlessHook
@@ -1236,6 +1248,12 @@ class
EmrServerlessStartJobOperator(AwsBaseOperator[EmrServerlessHook]):
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
enable_application_ui_links: bool = False,
cancel_on_kill: bool = True,
+ openlineage_inject_parent_job_info: bool = conf.getboolean(
+ "openlineage", "spark_inject_parent_job_info", fallback=False
+ ),
+ openlineage_inject_transport_info: bool = conf.getboolean(
+ "openlineage", "spark_inject_transport_info", fallback=False
+ ),
**kwargs,
):
waiter_delay = 60 if waiter_delay is NOTSET else waiter_delay
@@ -1254,6 +1272,8 @@ class
EmrServerlessStartJobOperator(AwsBaseOperator[EmrServerlessHook]):
self.deferrable = deferrable
self.enable_application_ui_links = enable_application_ui_links
self.cancel_on_kill = cancel_on_kill
+ self.openlineage_inject_parent_job_info =
openlineage_inject_parent_job_info
+ self.openlineage_inject_transport_info =
openlineage_inject_transport_info
super().__init__(**kwargs)
self.client_request_token = client_request_token or str(uuid4())
@@ -1287,6 +1307,19 @@ class
EmrServerlessStartJobOperator(AwsBaseOperator[EmrServerlessHook]):
)
self.log.info("Starting job on Application: %s", self.application_id)
self.name = self.name or self.config.pop("name",
f"emr_serverless_job_airflow_{uuid4()}")
+
+ configuration_overrides = self.configuration_overrides
+ if self.openlineage_inject_parent_job_info:
+ self.log.info("Injecting OpenLineage parent job information into
EMR Serverless configuration.")
+ configuration_overrides =
inject_parent_job_information_into_emr_serverless_properties(
+ configuration_overrides, context
+ )
+ if self.openlineage_inject_transport_info:
+ self.log.info("Injecting OpenLineage transport information into
EMR Serverless configuration.")
+ configuration_overrides =
inject_transport_information_into_emr_serverless_properties(
+ configuration_overrides, context
+ )
+
args = {
"clientToken": self.client_request_token,
"applicationId": self.application_id,
@@ -1295,8 +1328,8 @@ class
EmrServerlessStartJobOperator(AwsBaseOperator[EmrServerlessHook]):
"name": self.name,
**self.config,
}
- if self.configuration_overrides is not None:
- args["configurationOverrides"] = self.configuration_overrides
+ if configuration_overrides is not None:
+ args["configurationOverrides"] = configuration_overrides
response = self.hook.conn.start_job_run(
**args,
)
diff --git
a/providers/amazon/tests/unit/amazon/aws/operators/test_emr_serverless.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_emr_serverless.py
index 62cccc528d0..c851114e665 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_emr_serverless.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_emr_serverless.py
@@ -1513,3 +1513,224 @@ class TestEmrServerlessStopOperator:
)
validate_template_fields(operator)
+
+
+class TestEmrServerlessStartJobOperatorOpenLineageInjection:
+ """Tests for OpenLineage parent job info and transport info injection in
EmrServerlessStartJobOperator."""
+
+ @mock.patch.object(EmrServerlessHook, "get_waiter")
+ @mock.patch.object(EmrServerlessHook, "conn")
+ @mock.patch(
+ "airflow.providers.amazon.aws.operators.emr"
+ ".inject_parent_job_information_into_emr_serverless_properties"
+ )
+ def test_inject_parent_job_info_called_when_enabled(self,
mock_inject_parent, mock_conn, mock_get_waiter):
+ mock_inject_parent.side_effect = lambda overrides, ctx: {
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {"spark.openlineage.parentJobNamespace":
"ns"},
+ }
+ ]
+ }
+ mock_conn.get_application.return_value = {"application": {"state":
"STARTED"}}
+ mock_conn.start_job_run.return_value = {
+ "jobRunId": job_run_id,
+ "ResponseMetadata": {"HTTPStatusCode": 200},
+ }
+
+ operator = EmrServerlessStartJobOperator(
+ task_id=task_id,
+ application_id=application_id,
+ execution_role_arn=execution_role_arn,
+ job_driver=job_driver,
+ wait_for_completion=False,
+ openlineage_inject_parent_job_info=True,
+ )
+ operator.execute(mock.MagicMock())
+
+ mock_inject_parent.assert_called_once()
+ call_kwargs = mock_conn.start_job_run.call_args.kwargs
+ config_overrides = call_kwargs["configurationOverrides"]
+ assert (
+ config_overrides["applicationConfiguration"][0]["properties"][
+ "spark.openlineage.parentJobNamespace"
+ ]
+ == "ns"
+ )
+
+ @mock.patch.object(EmrServerlessHook, "get_waiter")
+ @mock.patch.object(EmrServerlessHook, "conn")
+ @mock.patch(
+ "airflow.providers.amazon.aws.operators.emr"
+ ".inject_parent_job_information_into_emr_serverless_properties"
+ )
+ def test_inject_parent_job_info_not_called_when_disabled(
+ self, mock_inject_parent, mock_conn, mock_get_waiter
+ ):
+ mock_conn.get_application.return_value = {"application": {"state":
"STARTED"}}
+ mock_conn.start_job_run.return_value = {
+ "jobRunId": job_run_id,
+ "ResponseMetadata": {"HTTPStatusCode": 200},
+ }
+
+ operator = EmrServerlessStartJobOperator(
+ task_id=task_id,
+ application_id=application_id,
+ execution_role_arn=execution_role_arn,
+ job_driver=job_driver,
+ wait_for_completion=False,
+ openlineage_inject_parent_job_info=False,
+ )
+ operator.execute(mock.MagicMock())
+
+ mock_inject_parent.assert_not_called()
+
+ @mock.patch.object(EmrServerlessHook, "get_waiter")
+ @mock.patch.object(EmrServerlessHook, "conn")
+ @mock.patch(
+ "airflow.providers.amazon.aws.operators.emr"
+ ".inject_transport_information_into_emr_serverless_properties"
+ )
+ def test_inject_transport_info_called_when_enabled(
+ self, mock_inject_transport, mock_conn, mock_get_waiter
+ ):
+ mock_inject_transport.side_effect = lambda overrides, ctx: {
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {"spark.openlineage.transport.type": "http"},
+ }
+ ]
+ }
+ mock_conn.get_application.return_value = {"application": {"state":
"STARTED"}}
+ mock_conn.start_job_run.return_value = {
+ "jobRunId": job_run_id,
+ "ResponseMetadata": {"HTTPStatusCode": 200},
+ }
+
+ operator = EmrServerlessStartJobOperator(
+ task_id=task_id,
+ application_id=application_id,
+ execution_role_arn=execution_role_arn,
+ job_driver=job_driver,
+ wait_for_completion=False,
+ openlineage_inject_transport_info=True,
+ )
+ operator.execute(mock.MagicMock())
+
+ mock_inject_transport.assert_called_once()
+ call_kwargs = mock_conn.start_job_run.call_args.kwargs
+ config_overrides = call_kwargs["configurationOverrides"]
+ assert (
+
config_overrides["applicationConfiguration"][0]["properties"]["spark.openlineage.transport.type"]
+ == "http"
+ )
+
+ @mock.patch.object(EmrServerlessHook, "get_waiter")
+ @mock.patch.object(EmrServerlessHook, "conn")
+ @mock.patch(
+ "airflow.providers.amazon.aws.operators.emr"
+ ".inject_parent_job_information_into_emr_serverless_properties"
+ )
+ @mock.patch(
+ "airflow.providers.amazon.aws.operators.emr"
+ ".inject_transport_information_into_emr_serverless_properties"
+ )
+ def test_inject_both_parent_and_transport_info(
+ self, mock_inject_transport, mock_inject_parent, mock_conn,
mock_get_waiter
+ ):
+ mock_inject_parent.side_effect = lambda overrides, ctx: {
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {"spark.openlineage.parentJobNamespace":
"ns"},
+ }
+ ]
+ }
+ mock_inject_transport.side_effect = lambda overrides, ctx: {
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {
+ **overrides.get("applicationConfiguration",
[{}])[0].get("properties", {}),
+ "spark.openlineage.transport.type": "http",
+ },
+ }
+ ]
+ }
+ mock_conn.get_application.return_value = {"application": {"state":
"STARTED"}}
+ mock_conn.start_job_run.return_value = {
+ "jobRunId": job_run_id,
+ "ResponseMetadata": {"HTTPStatusCode": 200},
+ }
+
+ operator = EmrServerlessStartJobOperator(
+ task_id=task_id,
+ application_id=application_id,
+ execution_role_arn=execution_role_arn,
+ job_driver=job_driver,
+ wait_for_completion=False,
+ openlineage_inject_parent_job_info=True,
+ openlineage_inject_transport_info=True,
+ )
+ operator.execute(mock.MagicMock())
+
+ mock_inject_parent.assert_called_once()
+ mock_inject_transport.assert_called_once()
+
+ @mock.patch.object(EmrServerlessHook, "get_waiter")
+ @mock.patch.object(EmrServerlessHook, "conn")
+ @mock.patch(
+ "airflow.providers.amazon.aws.operators.emr"
+ ".inject_parent_job_information_into_emr_serverless_properties"
+ )
+ def test_inject_parent_job_info_preserves_existing_config(
+ self, mock_inject_parent, mock_conn, mock_get_waiter
+ ):
+ """Existing configuration_overrides (e.g. monitoringConfiguration) are
preserved."""
+ existing_config = {
+ "monitoringConfiguration": {"s3MonitoringConfiguration":
{"logUri": "s3://bucket/logs"}},
+ "applicationConfiguration": [
+ {"classification": "spark-defaults", "properties":
{"spark.driver.memory": "8G"}}
+ ],
+ }
+ mock_inject_parent.side_effect = lambda overrides, ctx: {
+ **overrides,
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {
+
**overrides["applicationConfiguration"][0]["properties"],
+ "spark.openlineage.parentJobNamespace": "ns",
+ },
+ }
+ ],
+ }
+ mock_conn.get_application.return_value = {"application": {"state":
"STARTED"}}
+ mock_conn.start_job_run.return_value = {
+ "jobRunId": job_run_id,
+ "ResponseMetadata": {"HTTPStatusCode": 200},
+ }
+
+ operator = EmrServerlessStartJobOperator(
+ task_id=task_id,
+ application_id=application_id,
+ execution_role_arn=execution_role_arn,
+ job_driver=job_driver,
+ configuration_overrides=existing_config,
+ wait_for_completion=False,
+ openlineage_inject_parent_job_info=True,
+ )
+ operator.execute(mock.MagicMock())
+
+ call_kwargs = mock_conn.start_job_run.call_args.kwargs
+ config_overrides = call_kwargs["configurationOverrides"]
+ # Monitoring config preserved
+ assert
config_overrides["monitoringConfiguration"]["s3MonitoringConfiguration"]["logUri"]
== (
+ "s3://bucket/logs"
+ )
+ # OL parent info injected
+ props = config_overrides["applicationConfiguration"][0]["properties"]
+ assert props["spark.openlineage.parentJobNamespace"] == "ns"
+ assert props["spark.driver.memory"] == "8G"
diff --git
a/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
b/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
index d92dad56dad..91b4fae05d5 100644
---
a/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
+++
b/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
@@ -24,16 +24,20 @@ log = logging.getLogger(__name__)
if TYPE_CHECKING:
from airflow.providers.openlineage.utils.spark import (
+ inject_parent_job_information_into_emr_serverless_properties,
inject_parent_job_information_into_glue_arguments,
inject_parent_job_information_into_spark_properties,
+ inject_transport_information_into_emr_serverless_properties,
inject_transport_information_into_glue_arguments,
inject_transport_information_into_spark_properties,
)
from airflow.sdk import Context
try:
from airflow.providers.openlineage.utils.spark import (
+ inject_parent_job_information_into_emr_serverless_properties,
inject_parent_job_information_into_glue_arguments,
inject_parent_job_information_into_spark_properties,
+ inject_transport_information_into_emr_serverless_properties,
inject_transport_information_into_glue_arguments,
inject_transport_information_into_spark_properties,
)
@@ -67,10 +71,32 @@ except ImportError:
)
return script_args
+ def inject_parent_job_information_into_emr_serverless_properties(
+ configuration_overrides: dict | None, context: Context
+ ) -> dict:
+ log.warning(
+ "Could not import `airflow.providers.openlineage.plugins.macros`."
+ "Skipping the injection of OpenLineage parent job information into
"
+ "EMR Serverless configuration."
+ )
+ return configuration_overrides or {}
+
+ def inject_transport_information_into_emr_serverless_properties(
+ configuration_overrides: dict | None, context: Context
+ ) -> dict:
+ log.warning(
+ "Could not import
`airflow.providers.openlineage.plugins.listener`."
+ "Skipping the injection of OpenLineage transport information into "
+ "EMR Serverless configuration."
+ )
+ return configuration_overrides or {}
+
__all__ = [
+ "inject_parent_job_information_into_emr_serverless_properties",
"inject_parent_job_information_into_glue_arguments",
"inject_parent_job_information_into_spark_properties",
+ "inject_transport_information_into_emr_serverless_properties",
"inject_transport_information_into_glue_arguments",
"inject_transport_information_into_spark_properties",
]
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
index 837946fecbe..4b67d9383d0 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
@@ -265,3 +265,93 @@ def
inject_transport_information_into_glue_arguments(script_args: dict, context:
combined_conf = f"{existing_conf} --conf {new_conf_parts}" if
existing_conf else new_conf_parts
return {**script_args, "--conf": combined_conf}
+
+
+def _get_or_create_spark_defaults_properties(configuration_overrides: dict) ->
dict:
+ """
+ Find or create the ``spark-defaults`` classification entry and return its
``properties`` dict.
+
+ The ``configuration_overrides`` structure for EMR Serverless is::
+
+ {"applicationConfiguration": [{"classification": "spark-defaults",
"properties": {...}}, ...]}
+
+ If no ``spark-defaults`` entry exists, one is created.
+ """
+ app_config =
configuration_overrides.setdefault("applicationConfiguration", [])
+ for entry in app_config:
+ if entry.get("classification") == "spark-defaults":
+ entry.setdefault("properties", {})
+ return entry["properties"]
+ new_entry: dict = {"classification": "spark-defaults", "properties": {}}
+ app_config.append(new_entry)
+ return new_entry["properties"]
+
+
+def inject_parent_job_information_into_emr_serverless_properties(
+ configuration_overrides: dict | None, context: Context
+) -> dict:
+ """
+ Inject parent job information into EMR Serverless configuration if not
already present.
+
+ EMR Serverless passes Spark properties via a nested
``applicationConfiguration``
+ structure with ``classification: spark-defaults``.
+
+ Args:
+ configuration_overrides: EMR Serverless configuration overrides dict
(may be None).
+ context: The context containing task instance information.
+
+ Returns:
+ Modified configuration_overrides dict with OpenLineage parent job
information injected.
+ """
+ import copy
+
+ result = copy.deepcopy(configuration_overrides) if configuration_overrides
else {}
+ properties = _get_or_create_spark_defaults_properties(result)
+
+ if _is_parent_job_information_present_in_spark_properties(properties):
+ log.info(
+ "Some OpenLineage properties with parent job information are
already present "
+ "in EMR Serverless Spark properties. Skipping the injection of
OpenLineage "
+ "parent job information into EMR Serverless configuration."
+ )
+ return result
+
+ parent_props = _get_parent_job_information_as_spark_properties(context)
+ if parent_props:
+ properties.update(parent_props)
+ return result
+
+
+def inject_transport_information_into_emr_serverless_properties(
+ configuration_overrides: dict | None, context: Context
+) -> dict:
+ """
+ Inject transport information into EMR Serverless configuration if not
already present.
+
+ EMR Serverless passes Spark properties via a nested
``applicationConfiguration``
+ structure with ``classification: spark-defaults``.
+
+ Args:
+ configuration_overrides: EMR Serverless configuration overrides dict
(may be None).
+ context: The context containing task instance information.
+
+ Returns:
+ Modified configuration_overrides dict with OpenLineage transport
information injected.
+ """
+ import copy
+
+ result = copy.deepcopy(configuration_overrides) if configuration_overrides
else {}
+ properties = _get_or_create_spark_defaults_properties(result)
+
+ if _is_transport_information_present_in_spark_properties(properties):
+ log.info(
+ "Some OpenLineage properties with transport information are
already present "
+ "in EMR Serverless Spark properties. Skipping the injection of
OpenLineage "
+ "transport information into EMR Serverless configuration."
+ )
+ return result
+
+ transport_props = _get_transport_information_as_spark_properties()
+ if transport_props:
+ properties.update(transport_props)
+ return result
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
b/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
index be835d6a429..8ffea2dbd0d 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
@@ -31,8 +31,10 @@ from airflow.providers.openlineage.utils.spark import (
_get_transport_information_as_spark_properties,
_is_parent_job_information_present_in_spark_properties,
_is_transport_information_present_in_spark_properties,
+ inject_parent_job_information_into_emr_serverless_properties,
inject_parent_job_information_into_glue_arguments,
inject_parent_job_information_into_spark_properties,
+ inject_transport_information_into_emr_serverless_properties,
inject_transport_information_into_glue_arguments,
inject_transport_information_into_spark_properties,
)
@@ -462,3 +464,146 @@ def
test_inject_transport_information_into_glue_arguments_skips_if_already_prese
result = inject_transport_information_into_glue_arguments(script_args,
EXAMPLE_CONTEXT)
assert result["--conf"] == existing
fake_listener.adapter.get_or_create_openlineage_client.assert_not_called()
+
+
+# ---------------------------------------------------------------------------
+# EMR Serverless configuration injection tests
+# ---------------------------------------------------------------------------
+
+
+@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties")
+def
test_inject_parent_job_information_into_emr_serverless_properties_none_overrides(mock_get_parent):
+ """When configuration_overrides is None, creates the full structure with
injected properties."""
+ mock_get_parent.return_value = {
+ "spark.openlineage.parentJobNamespace": "ns",
+ "spark.openlineage.parentJobName": "dag.task",
+ }
+ result =
inject_parent_job_information_into_emr_serverless_properties(None,
EXAMPLE_CONTEXT)
+ assert "applicationConfiguration" in result
+ spark_defaults = result["applicationConfiguration"][0]
+ assert spark_defaults["classification"] == "spark-defaults"
+ assert
spark_defaults["properties"]["spark.openlineage.parentJobNamespace"] == "ns"
+ assert spark_defaults["properties"]["spark.openlineage.parentJobName"] ==
"dag.task"
+
+
+@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties")
+def
test_inject_parent_job_information_into_emr_serverless_properties_existing_spark_defaults(
+ mock_get_parent,
+):
+ """Appends to existing spark-defaults properties."""
+ mock_get_parent.return_value = {"spark.openlineage.parentJobNamespace":
"ns"}
+ config = {
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {"spark.driver.memory": "8G"},
+ }
+ ]
+ }
+ result =
inject_parent_job_information_into_emr_serverless_properties(config,
EXAMPLE_CONTEXT)
+ props = result["applicationConfiguration"][0]["properties"]
+ assert props["spark.driver.memory"] == "8G"
+ assert props["spark.openlineage.parentJobNamespace"] == "ns"
+
+
+@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties")
+def
test_inject_parent_job_information_into_emr_serverless_properties_no_spark_defaults(mock_get_parent):
+ """Creates spark-defaults entry when only other classifications exist."""
+ mock_get_parent.return_value = {"spark.openlineage.parentJobNamespace":
"ns"}
+ config = {
+ "applicationConfiguration": [
+ {"classification": "spark-env", "properties": {"PYSPARK_PYTHON":
"/usr/bin/python3"}}
+ ]
+ }
+ result =
inject_parent_job_information_into_emr_serverless_properties(config,
EXAMPLE_CONTEXT)
+ # Original entry preserved
+ assert result["applicationConfiguration"][0]["classification"] ==
"spark-env"
+ # New spark-defaults entry added
+ spark_defaults = result["applicationConfiguration"][1]
+ assert spark_defaults["classification"] == "spark-defaults"
+ assert
spark_defaults["properties"]["spark.openlineage.parentJobNamespace"] == "ns"
+
+
+@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties")
+def
test_inject_parent_job_information_into_emr_serverless_properties_skips_if_already_present(
+ mock_get_parent,
+):
+ """Injection is skipped when parent job info is already in spark-defaults
properties."""
+ config = {
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {"spark.openlineage.parentJobNamespace":
"already_there"},
+ }
+ ]
+ }
+ result =
inject_parent_job_information_into_emr_serverless_properties(config,
EXAMPLE_CONTEXT)
+ assert
result["applicationConfiguration"][0]["properties"]["spark.openlineage.parentJobNamespace"]
== (
+ "already_there"
+ )
+ mock_get_parent.assert_not_called()
+
+
+@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties")
+def
test_inject_parent_job_information_into_emr_serverless_properties_does_not_mutate_input(
+ mock_get_parent,
+):
+ """The original configuration_overrides dict is not mutated."""
+ mock_get_parent.return_value = {"spark.openlineage.parentJobNamespace":
"ns"}
+ original = {
+ "applicationConfiguration": [
+ {"classification": "spark-defaults", "properties":
{"spark.driver.memory": "4G"}}
+ ]
+ }
+ import copy
+
+ original_copy = copy.deepcopy(original)
+ inject_parent_job_information_into_emr_serverless_properties(original,
EXAMPLE_CONTEXT)
+ assert original == original_copy
+
+
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
+def
test_inject_transport_information_into_emr_serverless_properties_empty(mock_ol_listener):
+ """With no existing config, transport props are injected into new
spark-defaults."""
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
+ HttpConfig.from_dict(EXAMPLE_HTTP_TRANSPORT_CONFIG)
+ )
+ result = inject_transport_information_into_emr_serverless_properties(None,
EXAMPLE_CONTEXT)
+ props = result["applicationConfiguration"][0]["properties"]
+ assert props["spark.openlineage.transport.type"] == "http"
+ assert props["spark.openlineage.transport.url"] ==
"https://some-custom.url"
+
+
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
+def
test_inject_transport_information_into_emr_serverless_properties_skips_if_already_present(
+ mock_ol_listener,
+):
+ """Injection is skipped when transport info is already in spark-defaults
properties."""
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+ config = {
+ "applicationConfiguration": [
+ {
+ "classification": "spark-defaults",
+ "properties": {"spark.openlineage.transport.type": "http"},
+ }
+ ]
+ }
+ result =
inject_transport_information_into_emr_serverless_properties(config,
EXAMPLE_CONTEXT)
+ assert
result["applicationConfiguration"][0]["properties"]["spark.openlineage.transport.type"]
== "http"
+ fake_listener.adapter.get_or_create_openlineage_client.assert_not_called()
+
+
+@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties")
+def
test_inject_parent_job_information_into_emr_serverless_preserves_monitoring_config(mock_get_parent):
+ """Other keys like monitoringConfiguration are preserved."""
+ mock_get_parent.return_value = {"spark.openlineage.parentJobNamespace":
"ns"}
+ config = {
+ "applicationConfiguration": [{"classification": "spark-defaults",
"properties": {}}],
+ "monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri":
"s3://bucket/logs"}},
+ }
+ result =
inject_parent_job_information_into_emr_serverless_properties(config,
EXAMPLE_CONTEXT)
+ assert
result["monitoringConfiguration"]["s3MonitoringConfiguration"]["logUri"] ==
"s3://bucket/logs"
+ assert
result["applicationConfiguration"][0]["properties"]["spark.openlineage.parentJobNamespace"]
== "ns"