This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c2f83ea7c97 Add OpenLineage parent and transport info injection to 
`EmrServerlessStartJobOperator` (#64807)
c2f83ea7c97 is described below

commit c2f83ea7c973aef140b732d17fd720024644ef6e
Author: Rahul Madan <[email protected]>
AuthorDate: Tue Apr 7 13:48:30 2026 +0530

    Add OpenLineage parent and transport info injection to 
`EmrServerlessStartJobOperator` (#64807)
    
    Signed-off-by: Rahul Madan <[email protected]>
---
 .../airflow/providers/amazon/aws/operators/emr.py  |  37 +++-
 .../amazon/aws/operators/test_emr_serverless.py    | 221 +++++++++++++++++++++
 .../common/compat/openlineage/utils/spark.py       |  26 +++
 .../airflow/providers/openlineage/utils/spark.py   |  90 +++++++++
 .../tests/unit/openlineage/utils/test_spark.py     | 145 ++++++++++++++
 5 files changed, 517 insertions(+), 2 deletions(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
index 313cc649489..8efc71289e4 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
@@ -58,6 +58,10 @@ from airflow.providers.amazon.aws.utils.waiter import (
 )
 from airflow.providers.amazon.aws.utils.waiter_with_logging import wait
 from airflow.providers.amazon.version_compat import NOTSET, ArgNotSet
+from airflow.providers.common.compat.openlineage.utils.spark import (
+    inject_parent_job_information_into_emr_serverless_properties,
+    inject_transport_information_into_emr_serverless_properties,
+)
 from airflow.providers.common.compat.sdk import AirflowException, conf
 from airflow.utils.helpers import exactly_one, prune_dict
 
@@ -1197,6 +1201,14 @@ class 
EmrServerlessStartJobOperator(AwsBaseOperator[EmrServerlessHook]):
     :param cancel_on_kill: If True, the EMR Serverless job will be cancelled 
when the task is killed
         while in deferrable mode. This ensures that orphan jobs are not left 
running in EMR Serverless
         when an Airflow task is cancelled. Defaults to True.
+    :param openlineage_inject_parent_job_info: If True, injects OpenLineage 
parent job information
+        into the EMR Serverless ``spark-defaults`` configuration so the Spark 
job emits a
+        ``parentRunFacet`` linking back to the Airflow task. Defaults to the
+        ``openlineage.spark_inject_parent_job_info`` config value.
+    :param openlineage_inject_transport_info: If True, injects OpenLineage 
transport configuration
+        into the EMR Serverless ``spark-defaults`` configuration so the Spark 
job sends OL events
+        to the same backend as Airflow. Defaults to the
+        ``openlineage.spark_inject_transport_info`` config value.
     """
 
     aws_hook_class = EmrServerlessHook
@@ -1236,6 +1248,12 @@ class 
EmrServerlessStartJobOperator(AwsBaseOperator[EmrServerlessHook]):
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         enable_application_ui_links: bool = False,
         cancel_on_kill: bool = True,
+        openlineage_inject_parent_job_info: bool = conf.getboolean(
+            "openlineage", "spark_inject_parent_job_info", fallback=False
+        ),
+        openlineage_inject_transport_info: bool = conf.getboolean(
+            "openlineage", "spark_inject_transport_info", fallback=False
+        ),
         **kwargs,
     ):
         waiter_delay = 60 if waiter_delay is NOTSET else waiter_delay
@@ -1254,6 +1272,8 @@ class 
EmrServerlessStartJobOperator(AwsBaseOperator[EmrServerlessHook]):
         self.deferrable = deferrable
         self.enable_application_ui_links = enable_application_ui_links
         self.cancel_on_kill = cancel_on_kill
+        self.openlineage_inject_parent_job_info = 
openlineage_inject_parent_job_info
+        self.openlineage_inject_transport_info = 
openlineage_inject_transport_info
         super().__init__(**kwargs)
 
         self.client_request_token = client_request_token or str(uuid4())
@@ -1287,6 +1307,19 @@ class 
EmrServerlessStartJobOperator(AwsBaseOperator[EmrServerlessHook]):
             )
         self.log.info("Starting job on Application: %s", self.application_id)
         self.name = self.name or self.config.pop("name", 
f"emr_serverless_job_airflow_{uuid4()}")
+
+        configuration_overrides = self.configuration_overrides
+        if self.openlineage_inject_parent_job_info:
+            self.log.info("Injecting OpenLineage parent job information into 
EMR Serverless configuration.")
+            configuration_overrides = 
inject_parent_job_information_into_emr_serverless_properties(
+                configuration_overrides, context
+            )
+        if self.openlineage_inject_transport_info:
+            self.log.info("Injecting OpenLineage transport information into 
EMR Serverless configuration.")
+            configuration_overrides = 
inject_transport_information_into_emr_serverless_properties(
+                configuration_overrides, context
+            )
+
         args = {
             "clientToken": self.client_request_token,
             "applicationId": self.application_id,
@@ -1295,8 +1328,8 @@ class 
EmrServerlessStartJobOperator(AwsBaseOperator[EmrServerlessHook]):
             "name": self.name,
             **self.config,
         }
-        if self.configuration_overrides is not None:
-            args["configurationOverrides"] = self.configuration_overrides
+        if configuration_overrides is not None:
+            args["configurationOverrides"] = configuration_overrides
         response = self.hook.conn.start_job_run(
             **args,
         )
diff --git 
a/providers/amazon/tests/unit/amazon/aws/operators/test_emr_serverless.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_emr_serverless.py
index 62cccc528d0..c851114e665 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_emr_serverless.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_emr_serverless.py
@@ -1513,3 +1513,224 @@ class TestEmrServerlessStopOperator:
         )
 
         validate_template_fields(operator)
+
+
+class TestEmrServerlessStartJobOperatorOpenLineageInjection:
+    """Tests for OpenLineage parent job info and transport info injection in 
EmrServerlessStartJobOperator."""
+
+    @mock.patch.object(EmrServerlessHook, "get_waiter")
+    @mock.patch.object(EmrServerlessHook, "conn")
+    @mock.patch(
+        "airflow.providers.amazon.aws.operators.emr"
+        ".inject_parent_job_information_into_emr_serverless_properties"
+    )
+    def test_inject_parent_job_info_called_when_enabled(self, 
mock_inject_parent, mock_conn, mock_get_waiter):
+        mock_inject_parent.side_effect = lambda overrides, ctx: {
+            "applicationConfiguration": [
+                {
+                    "classification": "spark-defaults",
+                    "properties": {"spark.openlineage.parentJobNamespace": 
"ns"},
+                }
+            ]
+        }
+        mock_conn.get_application.return_value = {"application": {"state": 
"STARTED"}}
+        mock_conn.start_job_run.return_value = {
+            "jobRunId": job_run_id,
+            "ResponseMetadata": {"HTTPStatusCode": 200},
+        }
+
+        operator = EmrServerlessStartJobOperator(
+            task_id=task_id,
+            application_id=application_id,
+            execution_role_arn=execution_role_arn,
+            job_driver=job_driver,
+            wait_for_completion=False,
+            openlineage_inject_parent_job_info=True,
+        )
+        operator.execute(mock.MagicMock())
+
+        mock_inject_parent.assert_called_once()
+        call_kwargs = mock_conn.start_job_run.call_args.kwargs
+        config_overrides = call_kwargs["configurationOverrides"]
+        assert (
+            config_overrides["applicationConfiguration"][0]["properties"][
+                "spark.openlineage.parentJobNamespace"
+            ]
+            == "ns"
+        )
+
+    @mock.patch.object(EmrServerlessHook, "get_waiter")
+    @mock.patch.object(EmrServerlessHook, "conn")
+    @mock.patch(
+        "airflow.providers.amazon.aws.operators.emr"
+        ".inject_parent_job_information_into_emr_serverless_properties"
+    )
+    def test_inject_parent_job_info_not_called_when_disabled(
+        self, mock_inject_parent, mock_conn, mock_get_waiter
+    ):
+        mock_conn.get_application.return_value = {"application": {"state": 
"STARTED"}}
+        mock_conn.start_job_run.return_value = {
+            "jobRunId": job_run_id,
+            "ResponseMetadata": {"HTTPStatusCode": 200},
+        }
+
+        operator = EmrServerlessStartJobOperator(
+            task_id=task_id,
+            application_id=application_id,
+            execution_role_arn=execution_role_arn,
+            job_driver=job_driver,
+            wait_for_completion=False,
+            openlineage_inject_parent_job_info=False,
+        )
+        operator.execute(mock.MagicMock())
+
+        mock_inject_parent.assert_not_called()
+
+    @mock.patch.object(EmrServerlessHook, "get_waiter")
+    @mock.patch.object(EmrServerlessHook, "conn")
+    @mock.patch(
+        "airflow.providers.amazon.aws.operators.emr"
+        ".inject_transport_information_into_emr_serverless_properties"
+    )
+    def test_inject_transport_info_called_when_enabled(
+        self, mock_inject_transport, mock_conn, mock_get_waiter
+    ):
+        mock_inject_transport.side_effect = lambda overrides, ctx: {
+            "applicationConfiguration": [
+                {
+                    "classification": "spark-defaults",
+                    "properties": {"spark.openlineage.transport.type": "http"},
+                }
+            ]
+        }
+        mock_conn.get_application.return_value = {"application": {"state": 
"STARTED"}}
+        mock_conn.start_job_run.return_value = {
+            "jobRunId": job_run_id,
+            "ResponseMetadata": {"HTTPStatusCode": 200},
+        }
+
+        operator = EmrServerlessStartJobOperator(
+            task_id=task_id,
+            application_id=application_id,
+            execution_role_arn=execution_role_arn,
+            job_driver=job_driver,
+            wait_for_completion=False,
+            openlineage_inject_transport_info=True,
+        )
+        operator.execute(mock.MagicMock())
+
+        mock_inject_transport.assert_called_once()
+        call_kwargs = mock_conn.start_job_run.call_args.kwargs
+        config_overrides = call_kwargs["configurationOverrides"]
+        assert (
+            
config_overrides["applicationConfiguration"][0]["properties"]["spark.openlineage.transport.type"]
+            == "http"
+        )
+
+    @mock.patch.object(EmrServerlessHook, "get_waiter")
+    @mock.patch.object(EmrServerlessHook, "conn")
+    @mock.patch(
+        "airflow.providers.amazon.aws.operators.emr"
+        ".inject_parent_job_information_into_emr_serverless_properties"
+    )
+    @mock.patch(
+        "airflow.providers.amazon.aws.operators.emr"
+        ".inject_transport_information_into_emr_serverless_properties"
+    )
+    def test_inject_both_parent_and_transport_info(
+        self, mock_inject_transport, mock_inject_parent, mock_conn, 
mock_get_waiter
+    ):
+        mock_inject_parent.side_effect = lambda overrides, ctx: {
+            "applicationConfiguration": [
+                {
+                    "classification": "spark-defaults",
+                    "properties": {"spark.openlineage.parentJobNamespace": 
"ns"},
+                }
+            ]
+        }
+        mock_inject_transport.side_effect = lambda overrides, ctx: {
+            "applicationConfiguration": [
+                {
+                    "classification": "spark-defaults",
+                    "properties": {
+                        **overrides.get("applicationConfiguration", 
[{}])[0].get("properties", {}),
+                        "spark.openlineage.transport.type": "http",
+                    },
+                }
+            ]
+        }
+        mock_conn.get_application.return_value = {"application": {"state": 
"STARTED"}}
+        mock_conn.start_job_run.return_value = {
+            "jobRunId": job_run_id,
+            "ResponseMetadata": {"HTTPStatusCode": 200},
+        }
+
+        operator = EmrServerlessStartJobOperator(
+            task_id=task_id,
+            application_id=application_id,
+            execution_role_arn=execution_role_arn,
+            job_driver=job_driver,
+            wait_for_completion=False,
+            openlineage_inject_parent_job_info=True,
+            openlineage_inject_transport_info=True,
+        )
+        operator.execute(mock.MagicMock())
+
+        mock_inject_parent.assert_called_once()
+        mock_inject_transport.assert_called_once()
+
+    @mock.patch.object(EmrServerlessHook, "get_waiter")
+    @mock.patch.object(EmrServerlessHook, "conn")
+    @mock.patch(
+        "airflow.providers.amazon.aws.operators.emr"
+        ".inject_parent_job_information_into_emr_serverless_properties"
+    )
+    def test_inject_parent_job_info_preserves_existing_config(
+        self, mock_inject_parent, mock_conn, mock_get_waiter
+    ):
+        """Existing configuration_overrides (e.g. monitoringConfiguration) are 
preserved."""
+        existing_config = {
+            "monitoringConfiguration": {"s3MonitoringConfiguration": 
{"logUri": "s3://bucket/logs"}},
+            "applicationConfiguration": [
+                {"classification": "spark-defaults", "properties": 
{"spark.driver.memory": "8G"}}
+            ],
+        }
+        mock_inject_parent.side_effect = lambda overrides, ctx: {
+            **overrides,
+            "applicationConfiguration": [
+                {
+                    "classification": "spark-defaults",
+                    "properties": {
+                        
**overrides["applicationConfiguration"][0]["properties"],
+                        "spark.openlineage.parentJobNamespace": "ns",
+                    },
+                }
+            ],
+        }
+        mock_conn.get_application.return_value = {"application": {"state": 
"STARTED"}}
+        mock_conn.start_job_run.return_value = {
+            "jobRunId": job_run_id,
+            "ResponseMetadata": {"HTTPStatusCode": 200},
+        }
+
+        operator = EmrServerlessStartJobOperator(
+            task_id=task_id,
+            application_id=application_id,
+            execution_role_arn=execution_role_arn,
+            job_driver=job_driver,
+            configuration_overrides=existing_config,
+            wait_for_completion=False,
+            openlineage_inject_parent_job_info=True,
+        )
+        operator.execute(mock.MagicMock())
+
+        call_kwargs = mock_conn.start_job_run.call_args.kwargs
+        config_overrides = call_kwargs["configurationOverrides"]
+        # Monitoring config preserved
+        assert 
config_overrides["monitoringConfiguration"]["s3MonitoringConfiguration"]["logUri"]
 == (
+            "s3://bucket/logs"
+        )
+        # OL parent info injected
+        props = config_overrides["applicationConfiguration"][0]["properties"]
+        assert props["spark.openlineage.parentJobNamespace"] == "ns"
+        assert props["spark.driver.memory"] == "8G"
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
 
b/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
index d92dad56dad..91b4fae05d5 100644
--- 
a/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
@@ -24,16 +24,20 @@ log = logging.getLogger(__name__)
 
 if TYPE_CHECKING:
     from airflow.providers.openlineage.utils.spark import (
+        inject_parent_job_information_into_emr_serverless_properties,
         inject_parent_job_information_into_glue_arguments,
         inject_parent_job_information_into_spark_properties,
+        inject_transport_information_into_emr_serverless_properties,
         inject_transport_information_into_glue_arguments,
         inject_transport_information_into_spark_properties,
     )
     from airflow.sdk import Context
 try:
     from airflow.providers.openlineage.utils.spark import (
+        inject_parent_job_information_into_emr_serverless_properties,
         inject_parent_job_information_into_glue_arguments,
         inject_parent_job_information_into_spark_properties,
+        inject_transport_information_into_emr_serverless_properties,
         inject_transport_information_into_glue_arguments,
         inject_transport_information_into_spark_properties,
     )
@@ -67,10 +71,32 @@ except ImportError:
         )
         return script_args
 
+    def inject_parent_job_information_into_emr_serverless_properties(
+        configuration_overrides: dict | None, context: Context
+    ) -> dict:
+        log.warning(
+            "Could not import `airflow.providers.openlineage.plugins.macros`."
+            "Skipping the injection of OpenLineage parent job information into 
"
+            "EMR Serverless configuration."
+        )
+        return configuration_overrides or {}
+
+    def inject_transport_information_into_emr_serverless_properties(
+        configuration_overrides: dict | None, context: Context
+    ) -> dict:
+        log.warning(
+            "Could not import 
`airflow.providers.openlineage.plugins.listener`."
+            "Skipping the injection of OpenLineage transport information into "
+            "EMR Serverless configuration."
+        )
+        return configuration_overrides or {}
+
 
 __all__ = [
+    "inject_parent_job_information_into_emr_serverless_properties",
     "inject_parent_job_information_into_glue_arguments",
     "inject_parent_job_information_into_spark_properties",
+    "inject_transport_information_into_emr_serverless_properties",
     "inject_transport_information_into_glue_arguments",
     "inject_transport_information_into_spark_properties",
 ]
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
index 837946fecbe..4b67d9383d0 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
@@ -265,3 +265,93 @@ def 
inject_transport_information_into_glue_arguments(script_args: dict, context:
 
     combined_conf = f"{existing_conf} --conf {new_conf_parts}" if 
existing_conf else new_conf_parts
     return {**script_args, "--conf": combined_conf}
+
+
+def _get_or_create_spark_defaults_properties(configuration_overrides: dict) -> 
dict:
+    """
+    Find or create the ``spark-defaults`` classification entry and return its 
``properties`` dict.
+
+    The ``configuration_overrides`` structure for EMR Serverless is::
+
+        {"applicationConfiguration": [{"classification": "spark-defaults", 
"properties": {...}}, ...]}
+
+    If no ``spark-defaults`` entry exists, one is created.
+    """
+    app_config = 
configuration_overrides.setdefault("applicationConfiguration", [])
+    for entry in app_config:
+        if entry.get("classification") == "spark-defaults":
+            entry.setdefault("properties", {})
+            return entry["properties"]
+    new_entry: dict = {"classification": "spark-defaults", "properties": {}}
+    app_config.append(new_entry)
+    return new_entry["properties"]
+
+
+def inject_parent_job_information_into_emr_serverless_properties(
+    configuration_overrides: dict | None, context: Context
+) -> dict:
+    """
+    Inject parent job information into EMR Serverless configuration if not 
already present.
+
+    EMR Serverless passes Spark properties via a nested 
``applicationConfiguration``
+    structure with ``classification: spark-defaults``.
+
+    Args:
+        configuration_overrides: EMR Serverless configuration overrides dict 
(may be None).
+        context: The context containing task instance information.
+
+    Returns:
+        Modified configuration_overrides dict with OpenLineage parent job 
information injected.
+    """
+    import copy
+
+    result = copy.deepcopy(configuration_overrides) if configuration_overrides 
else {}
+    properties = _get_or_create_spark_defaults_properties(result)
+
+    if _is_parent_job_information_present_in_spark_properties(properties):
+        log.info(
+            "Some OpenLineage properties with parent job information are 
already present "
+            "in EMR Serverless Spark properties. Skipping the injection of 
OpenLineage "
+            "parent job information into EMR Serverless configuration."
+        )
+        return result
+
+    parent_props = _get_parent_job_information_as_spark_properties(context)
+    if parent_props:
+        properties.update(parent_props)
+    return result
+
+
+def inject_transport_information_into_emr_serverless_properties(
+    configuration_overrides: dict | None, context: Context
+) -> dict:
+    """
+    Inject transport information into EMR Serverless configuration if not 
already present.
+
+    EMR Serverless passes Spark properties via a nested 
``applicationConfiguration``
+    structure with ``classification: spark-defaults``.
+
+    Args:
+        configuration_overrides: EMR Serverless configuration overrides dict 
(may be None).
+        context: The context containing task instance information.
+
+    Returns:
+        Modified configuration_overrides dict with OpenLineage transport 
information injected.
+    """
+    import copy
+
+    result = copy.deepcopy(configuration_overrides) if configuration_overrides 
else {}
+    properties = _get_or_create_spark_defaults_properties(result)
+
+    if _is_transport_information_present_in_spark_properties(properties):
+        log.info(
+            "Some OpenLineage properties with transport information are 
already present "
+            "in EMR Serverless Spark properties. Skipping the injection of 
OpenLineage "
+            "transport information into EMR Serverless configuration."
+        )
+        return result
+
+    transport_props = _get_transport_information_as_spark_properties()
+    if transport_props:
+        properties.update(transport_props)
+    return result
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_spark.py 
b/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
index be835d6a429..8ffea2dbd0d 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
@@ -31,8 +31,10 @@ from airflow.providers.openlineage.utils.spark import (
     _get_transport_information_as_spark_properties,
     _is_parent_job_information_present_in_spark_properties,
     _is_transport_information_present_in_spark_properties,
+    inject_parent_job_information_into_emr_serverless_properties,
     inject_parent_job_information_into_glue_arguments,
     inject_parent_job_information_into_spark_properties,
+    inject_transport_information_into_emr_serverless_properties,
     inject_transport_information_into_glue_arguments,
     inject_transport_information_into_spark_properties,
 )
@@ -462,3 +464,146 @@ def 
test_inject_transport_information_into_glue_arguments_skips_if_already_prese
     result = inject_transport_information_into_glue_arguments(script_args, 
EXAMPLE_CONTEXT)
     assert result["--conf"] == existing
     fake_listener.adapter.get_or_create_openlineage_client.assert_not_called()
+
+
+# ---------------------------------------------------------------------------
+# EMR Serverless configuration injection tests
+# ---------------------------------------------------------------------------
+
+
+@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties")
+def 
test_inject_parent_job_information_into_emr_serverless_properties_none_overrides(mock_get_parent):
+    """When configuration_overrides is None, creates the full structure with 
injected properties."""
+    mock_get_parent.return_value = {
+        "spark.openlineage.parentJobNamespace": "ns",
+        "spark.openlineage.parentJobName": "dag.task",
+    }
+    result = 
inject_parent_job_information_into_emr_serverless_properties(None, 
EXAMPLE_CONTEXT)
+    assert "applicationConfiguration" in result
+    spark_defaults = result["applicationConfiguration"][0]
+    assert spark_defaults["classification"] == "spark-defaults"
+    assert 
spark_defaults["properties"]["spark.openlineage.parentJobNamespace"] == "ns"
+    assert spark_defaults["properties"]["spark.openlineage.parentJobName"] == 
"dag.task"
+
+
+@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties")
+def 
test_inject_parent_job_information_into_emr_serverless_properties_existing_spark_defaults(
+    mock_get_parent,
+):
+    """Appends to existing spark-defaults properties."""
+    mock_get_parent.return_value = {"spark.openlineage.parentJobNamespace": 
"ns"}
+    config = {
+        "applicationConfiguration": [
+            {
+                "classification": "spark-defaults",
+                "properties": {"spark.driver.memory": "8G"},
+            }
+        ]
+    }
+    result = 
inject_parent_job_information_into_emr_serverless_properties(config, 
EXAMPLE_CONTEXT)
+    props = result["applicationConfiguration"][0]["properties"]
+    assert props["spark.driver.memory"] == "8G"
+    assert props["spark.openlineage.parentJobNamespace"] == "ns"
+
+
+@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties")
+def 
test_inject_parent_job_information_into_emr_serverless_properties_no_spark_defaults(mock_get_parent):
+    """Creates spark-defaults entry when only other classifications exist."""
+    mock_get_parent.return_value = {"spark.openlineage.parentJobNamespace": 
"ns"}
+    config = {
+        "applicationConfiguration": [
+            {"classification": "spark-env", "properties": {"PYSPARK_PYTHON": 
"/usr/bin/python3"}}
+        ]
+    }
+    result = 
inject_parent_job_information_into_emr_serverless_properties(config, 
EXAMPLE_CONTEXT)
+    # Original entry preserved
+    assert result["applicationConfiguration"][0]["classification"] == 
"spark-env"
+    # New spark-defaults entry added
+    spark_defaults = result["applicationConfiguration"][1]
+    assert spark_defaults["classification"] == "spark-defaults"
+    assert 
spark_defaults["properties"]["spark.openlineage.parentJobNamespace"] == "ns"
+
+
+@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties")
+def 
test_inject_parent_job_information_into_emr_serverless_properties_skips_if_already_present(
+    mock_get_parent,
+):
+    """Injection is skipped when parent job info is already in spark-defaults 
properties."""
+    config = {
+        "applicationConfiguration": [
+            {
+                "classification": "spark-defaults",
+                "properties": {"spark.openlineage.parentJobNamespace": 
"already_there"},
+            }
+        ]
+    }
+    result = 
inject_parent_job_information_into_emr_serverless_properties(config, 
EXAMPLE_CONTEXT)
+    assert 
result["applicationConfiguration"][0]["properties"]["spark.openlineage.parentJobNamespace"]
 == (
+        "already_there"
+    )
+    mock_get_parent.assert_not_called()
+
+
+@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties")
+def 
test_inject_parent_job_information_into_emr_serverless_properties_does_not_mutate_input(
+    mock_get_parent,
+):
+    """The original configuration_overrides dict is not mutated."""
+    mock_get_parent.return_value = {"spark.openlineage.parentJobNamespace": 
"ns"}
+    original = {
+        "applicationConfiguration": [
+            {"classification": "spark-defaults", "properties": 
{"spark.driver.memory": "4G"}}
+        ]
+    }
+    import copy
+
+    original_copy = copy.deepcopy(original)
+    inject_parent_job_information_into_emr_serverless_properties(original, 
EXAMPLE_CONTEXT)
+    assert original == original_copy
+
+
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
+def 
test_inject_transport_information_into_emr_serverless_properties_empty(mock_ol_listener):
+    """With no existing config, transport props are injected into new 
spark-defaults."""
+    fake_listener = mock.MagicMock()
+    mock_ol_listener.return_value = fake_listener
+    
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
+        HttpConfig.from_dict(EXAMPLE_HTTP_TRANSPORT_CONFIG)
+    )
+    result = inject_transport_information_into_emr_serverless_properties(None, 
EXAMPLE_CONTEXT)
+    props = result["applicationConfiguration"][0]["properties"]
+    assert props["spark.openlineage.transport.type"] == "http"
+    assert props["spark.openlineage.transport.url"] == 
"https://some-custom.url";
+
+
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
+def 
test_inject_transport_information_into_emr_serverless_properties_skips_if_already_present(
+    mock_ol_listener,
+):
+    """Injection is skipped when transport info is already in spark-defaults 
properties."""
+    fake_listener = mock.MagicMock()
+    mock_ol_listener.return_value = fake_listener
+    config = {
+        "applicationConfiguration": [
+            {
+                "classification": "spark-defaults",
+                "properties": {"spark.openlineage.transport.type": "http"},
+            }
+        ]
+    }
+    result = 
inject_transport_information_into_emr_serverless_properties(config, 
EXAMPLE_CONTEXT)
+    assert 
result["applicationConfiguration"][0]["properties"]["spark.openlineage.transport.type"]
 == "http"
+    fake_listener.adapter.get_or_create_openlineage_client.assert_not_called()
+
+
+@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties")
+def 
test_inject_parent_job_information_into_emr_serverless_preserves_monitoring_config(mock_get_parent):
+    """Other keys like monitoringConfiguration are preserved."""
+    mock_get_parent.return_value = {"spark.openlineage.parentJobNamespace": 
"ns"}
+    config = {
+        "applicationConfiguration": [{"classification": "spark-defaults", 
"properties": {}}],
+        "monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": 
"s3://bucket/logs"}},
+    }
+    result = 
inject_parent_job_information_into_emr_serverless_properties(config, 
EXAMPLE_CONTEXT)
+    assert 
result["monitoringConfiguration"]["s3MonitoringConfiguration"]["logUri"] == 
"s3://bucket/logs"
+    assert 
result["applicationConfiguration"][0]["properties"]["spark.openlineage.parentJobNamespace"]
 == "ns"

Reply via email to