This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch glue-pass-params-v2
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 758753706b88fb891e5a43c6702f65756f1f85d6
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Fri Dec 5 13:30:17 2025 +0100

    glue pass openlineage params
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 .../airflow/providers/amazon/aws/operators/glue.py |  10 ++
 .../providers/amazon/aws/utils/openlineage.py      |  74 +++++++++++
 .../unit/amazon/aws/utils/test_openlineage.py      | 144 +++++++++++++++++++++
 .../common/compat/openlineage/utils/spark.py       |   6 +
 .../airflow/providers/openlineage/utils/spark.py   |  54 ++++++--
 .../prek/check_contextmanager_class_decorators.py  |   0
 6 files changed, 280 insertions(+), 8 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py
index 536121de679..d6dbb9ccc8a 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py
@@ -36,6 +36,9 @@ from airflow.providers.amazon.aws.triggers.glue import (
 )
 from airflow.providers.amazon.aws.utils import validate_execute_complete_event
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
+from airflow.providers.amazon.aws.utils.openlineage import (
+    inject_parent_job_information_into_glue_script_args,
+)
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
@@ -139,10 +142,14 @@ class GlueJobOperator(AwsBaseOperator[GlueJobHook]):
         job_poll_interval: int | float = 6,
         waiter_delay: int = 60,
         waiter_max_attempts: int = 75,
+        openlineage_inject_parent_job_info: bool = conf.getboolean(
+            "openlineage", "spark_inject_parent_job_info", fallback=False
+        ),
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.job_name = job_name
+        self._openlineage_inject_parent_job_info = 
openlineage_inject_parent_job_info
         self.job_desc = job_desc
         self.script_location = script_location
         self.concurrent_run_limit = concurrent_run_limit or 1
@@ -217,6 +224,9 @@ class GlueJobOperator(AwsBaseOperator[GlueJobHook]):
 
         :return: the current Glue job ID.
         """
+        if self._openlineage_inject_parent_job_info:
+            self.log.debug("Injecting OpenLineage parent job information into 
Glue script_args.")
+            self.script_args = 
inject_parent_job_information_into_glue_script_args(self.script_args, context)
         self.log.info(
             "Initializing AWS Glue Job: %s. Wait for completion: %s",
             self.job_name,
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/utils/openlineage.py 
b/providers/amazon/src/airflow/providers/amazon/aws/utils/openlineage.py
index be5703e2f6e..65946ef2869 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/utils/openlineage.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/utils/openlineage.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import logging
 from typing import TYPE_CHECKING, Any
 
 from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
@@ -28,9 +29,13 @@ from airflow.providers.common.compat.openlineage.facet 
import (
     SchemaDatasetFacet,
     SchemaDatasetFacetFields,
 )
+from airflow.providers.common.compat.openlineage.utils.spark import 
get_parent_job_information
 
 if TYPE_CHECKING:
     from airflow.providers.amazon.aws.hooks.redshift_data import 
RedshiftDataHook
+    from airflow.utils.context import Context
+
+log = logging.getLogger(__name__)
 
 
 def get_facets_from_redshift_table(
@@ -136,3 +141,72 @@ def get_identity_column_lineage_facet(
         }
     )
     return column_lineage_facet
+
+
+def inject_parent_job_information_into_glue_script_args(
+    script_args: dict[str, Any], context: Context
+) -> dict[str, Any]:
+    """
+    Inject OpenLineage parent job info into Glue script_args via --conf.
+
+    The parent job information is injected as Spark configuration properties 
via
+    the ``--conf`` argument. This uses the same ``spark.openlineage.*`` 
properties
+    that the OpenLineage Spark integration recognizes.
+
+    Note: We use ``--conf`` instead of ``--customer-driver-env-vars`` because 
AWS Glue
+    requires all environment variable keys to have a ``CUSTOMER_`` prefix, 
which would
+    not be recognized by the OpenLineage Spark integration.
+
+    AWS Glue ``--conf`` format requires multiple properties to be formatted 
as::
+
+        spark.prop1=val1 --conf spark.prop2=val2 --conf spark.prop3=val3
+
+    See: 
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
+
+    Behavior:
+        - If OpenLineage provider is not available, returns script_args 
unchanged
+        - If user already set any ``spark.openlineage.parent*`` properties in 
``--conf``,
+          skips injection to preserve user-provided values
+        - Appends to existing ``--conf`` if present (with ``--conf`` prefix)
+        - Returns new dict (does not mutate original)
+
+    Args:
+        script_args: The Glue job's script_args dict.
+        context: Airflow task context.
+
+    Returns:
+        Modified script_args with OpenLineage Spark properties injected in 
``--conf``.
+    """
+    info = get_parent_job_information(context)
+    if info is None:
+        return script_args
+
+    existing_conf = script_args.get("--conf", "")
+
+    # Skip if user already configured OpenLineage parent properties
+    if "spark.openlineage.parent" in existing_conf:
+        log.debug(
+            "OpenLineage parent job properties already present in --conf. "
+            "Skipping injection to preserve user-provided values."
+        )
+        return script_args
+
+    ol_spark_properties = [
+        f"spark.openlineage.parentJobNamespace={info.parent_job_namespace}",
+        f"spark.openlineage.parentJobName={info.parent_job_name}",
+        f"spark.openlineage.parentRunId={info.parent_run_id}",
+        
f"spark.openlineage.rootParentJobNamespace={info.root_parent_job_namespace}",
+        f"spark.openlineage.rootParentJobName={info.root_parent_job_name}",
+        f"spark.openlineage.rootParentRunId={info.root_parent_run_id}",
+    ]
+
+    # AWS Glue --conf format: "prop1=val1 --conf prop2=val2 --conf prop3=val3"
+    # First property has no --conf prefix, subsequent ones do
+    new_conf = " --conf ".join(ol_spark_properties)
+    if existing_conf:
+        new_conf = f"{existing_conf} --conf {new_conf}"
+
+    new_script_args = {**script_args}
+    new_script_args["--conf"] = new_conf
+
+    return new_script_args
diff --git a/providers/amazon/tests/unit/amazon/aws/utils/test_openlineage.py 
b/providers/amazon/tests/unit/amazon/aws/utils/test_openlineage.py
index 3790a590238..ac58420bafc 100644
--- a/providers/amazon/tests/unit/amazon/aws/utils/test_openlineage.py
+++ b/providers/amazon/tests/unit/amazon/aws/utils/test_openlineage.py
@@ -17,6 +17,7 @@
 
 from __future__ import annotations
 
+from datetime import datetime
 from unittest import mock
 
 import pytest
@@ -26,6 +27,7 @@ from airflow.providers.amazon.aws.hooks.redshift_sql import 
RedshiftSQLHook
 from airflow.providers.amazon.aws.utils.openlineage import (
     get_facets_from_redshift_table,
     get_identity_column_lineage_facet,
+    inject_parent_job_information_into_glue_script_args,
 )
 from airflow.providers.common.compat.openlineage.facet import (
     ColumnLineageDatasetFacet,
@@ -34,6 +36,17 @@ from airflow.providers.common.compat.openlineage.facet 
import (
     InputField,
 )
 
+EXAMPLE_CONTEXT = {
+    "ti": mock.MagicMock(
+        dag_id="dag_id",
+        task_id="task_id",
+        try_number=1,
+        map_index=-1,
+        logical_date=datetime(2024, 11, 11),
+        dag_run=mock.MagicMock(logical_date=datetime(2024, 11, 11), 
clear_number=0),
+    )
+}
+
 
 
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook.get_records")
 def test_get_facets_from_redshift_table_sql_hook(mock_get_records):
@@ -168,3 +181,134 @@ def 
test_get_identity_column_lineage_facet_no_input_datasets():
         ValueError, match="When providing `field_names` You must provide at 
least one `input_dataset`."
     ):
         get_identity_column_lineage_facet(field_names=field_names, 
input_datasets=input_datasets)
+
+
+class TestInjectParentJobInformationIntoGlueScriptArgs:
+    """Tests for injecting OpenLineage parent job info via --conf Spark 
properties."""
+
+    @mock.patch(
+        
"airflow.providers.amazon.aws.utils.openlineage.get_parent_job_information",
+        return_value=None,
+    )
+    def test_skips_when_openlineage_not_available(self, mock_get_parent_info):
+        """Returns unchanged script_args when OpenLineage is not available."""
+        result = inject_parent_job_information_into_glue_script_args({}, 
EXAMPLE_CONTEXT)
+        assert result == {}
+
+    
@mock.patch("airflow.providers.amazon.aws.utils.openlineage.get_parent_job_information")
+    def test_injects_into_empty_script_args(self, mock_get_parent_info):
+        """Injects --conf with Spark OpenLineage properties into empty 
script_args."""
+        from airflow.providers.openlineage.utils.spark import 
ParentJobInformation
+
+        mock_get_parent_info.return_value = ParentJobInformation(
+            parent_job_namespace="default",
+            parent_job_name="dag_id.task_id",
+            parent_run_id="uuid-123",
+            root_parent_job_namespace="default",
+            root_parent_job_name="dag_id",
+            root_parent_run_id="uuid-456",
+        )
+
+        result = inject_parent_job_information_into_glue_script_args({}, 
EXAMPLE_CONTEXT)
+
+        assert "--conf" in result
+        conf = result["--conf"]
+        # Verify AWS Glue --conf format: "prop1=val1 --conf prop2=val2 --conf 
..."
+        assert "spark.openlineage.parentJobNamespace=default" in conf
+        assert "spark.openlineage.parentJobName=dag_id.task_id" in conf
+        assert "spark.openlineage.parentRunId=uuid-123" in conf
+        assert "spark.openlineage.rootParentJobNamespace=default" in conf
+        assert "spark.openlineage.rootParentJobName=dag_id" in conf
+        assert "spark.openlineage.rootParentRunId=uuid-456" in conf
+        # Verify the format uses " --conf " as separator between properties
+        assert " --conf " in conf
+
+    
@mock.patch("airflow.providers.amazon.aws.utils.openlineage.get_parent_job_information")
+    def test_appends_to_existing_conf(self, mock_get_parent_info):
+        """Appends OpenLineage properties to existing --conf value with --conf 
prefix."""
+        from airflow.providers.openlineage.utils.spark import 
ParentJobInformation
+
+        mock_get_parent_info.return_value = ParentJobInformation(
+            parent_job_namespace="default",
+            parent_job_name="dag.task",
+            parent_run_id="uuid-123",
+            root_parent_job_namespace="default",
+            root_parent_job_name="dag",
+            root_parent_run_id="uuid-456",
+        )
+
+        existing = {"--conf": "spark.executor.memory=4g --conf 
spark.driver.memory=2g"}
+        result = inject_parent_job_information_into_glue_script_args(existing, 
EXAMPLE_CONTEXT)
+
+        conf = result["--conf"]
+        # Original conf preserved at the beginning
+        assert conf.startswith("spark.executor.memory=4g --conf 
spark.driver.memory=2g")
+        # OpenLineage properties added with --conf prefix
+        assert " --conf spark.openlineage.parentJobName=dag.task" in conf
+
+    
@mock.patch("airflow.providers.amazon.aws.utils.openlineage.get_parent_job_information")
+    def test_skips_when_ol_properties_already_in_conf(self, 
mock_get_parent_info):
+        """Skips injection if user already set spark.openlineage.parent* 
properties."""
+        from airflow.providers.openlineage.utils.spark import 
ParentJobInformation
+
+        mock_get_parent_info.return_value = ParentJobInformation(
+            parent_job_namespace="default",
+            parent_job_name="dag.task",
+            parent_run_id="uuid-123",
+            root_parent_job_namespace="default",
+            root_parent_job_name="dag",
+            root_parent_run_id="uuid-456",
+        )
+
+        # User already set their own parent job info
+        existing = {"--conf": "spark.openlineage.parentJobName=custom_job"}
+        result = inject_parent_job_information_into_glue_script_args(existing, 
EXAMPLE_CONTEXT)
+
+        # Should return unchanged - user's values preserved
+        assert result == existing
+
+    
@mock.patch("airflow.providers.amazon.aws.utils.openlineage.get_parent_job_information")
+    def test_preserves_other_script_args(self, mock_get_parent_info):
+        """Preserves other script_args while adding --conf."""
+        from airflow.providers.openlineage.utils.spark import 
ParentJobInformation
+
+        mock_get_parent_info.return_value = ParentJobInformation(
+            parent_job_namespace="default",
+            parent_job_name="dag.task",
+            parent_run_id="uuid-123",
+            root_parent_job_namespace="default",
+            root_parent_job_name="dag",
+            root_parent_run_id="uuid-456",
+        )
+
+        existing = {"--input": "s3://bucket/input", "--output": 
"s3://bucket/output"}
+        result = inject_parent_job_information_into_glue_script_args(existing, 
EXAMPLE_CONTEXT)
+
+        assert result["--input"] == "s3://bucket/input"
+        assert result["--output"] == "s3://bucket/output"
+        assert "--conf" in result
+
+    
@mock.patch("airflow.providers.amazon.aws.utils.openlineage.get_parent_job_information")
+    def test_does_not_mutate_original_script_args(self, mock_get_parent_info):
+        """Ensures original script_args dict is not mutated."""
+        from airflow.providers.openlineage.utils.spark import 
ParentJobInformation
+
+        mock_get_parent_info.return_value = ParentJobInformation(
+            parent_job_namespace="default",
+            parent_job_name="dag.task",
+            parent_run_id="uuid-123",
+            root_parent_job_namespace="default",
+            root_parent_job_name="dag",
+            root_parent_run_id="uuid-456",
+        )
+
+        original = {"--input": "s3://bucket/input"}
+        original_copy = dict(original)
+        result = inject_parent_job_information_into_glue_script_args(original, 
EXAMPLE_CONTEXT)
+
+        # Original unchanged
+        assert original == original_copy
+        # Result is different object
+        assert result is not original
+        assert "--conf" in result
+        assert "--conf" not in original
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
 
b/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
index 1028bf3debf..414acce2081 100644
--- 
a/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
@@ -24,12 +24,14 @@ log = logging.getLogger(__name__)
 
 if TYPE_CHECKING:
     from airflow.providers.openlineage.utils.spark import (
+        get_parent_job_information,
         inject_parent_job_information_into_spark_properties,
         inject_transport_information_into_spark_properties,
     )
     from airflow.sdk import Context
 try:
     from airflow.providers.openlineage.utils.spark import (
+        get_parent_job_information,
         inject_parent_job_information_into_spark_properties,
         inject_transport_information_into_spark_properties,
     )
@@ -49,8 +51,12 @@ except ImportError:
         )
         return properties
 
+    def get_parent_job_information(context: Context) -> None:
+        return None
+
 
 __all__ = [
     "inject_parent_job_information_into_spark_properties",
     "inject_transport_information_into_spark_properties",
+    "get_parent_job_information",
 ]
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
index a92ac25eab2..438789e0c9c 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
@@ -18,13 +18,14 @@
 from __future__ import annotations
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, NamedTuple
 
 from airflow.providers.openlineage.plugins.listener import 
get_openlineage_listener
 from airflow.providers.openlineage.plugins.macros import (
     lineage_job_name,
     lineage_job_namespace,
     lineage_root_job_name,
+    lineage_root_job_namespace,
     lineage_root_run_id,
     lineage_run_id,
 )
@@ -35,6 +36,43 @@ if TYPE_CHECKING:
 log = logging.getLogger(__name__)
 
 
+class ParentJobInformation(NamedTuple):
+    """Container for OpenLineage parent job information."""
+
+    parent_job_namespace: str
+    parent_job_name: str
+    parent_run_id: str
+    root_parent_job_namespace: str
+    root_parent_job_name: str
+    root_parent_run_id: str
+
+
+def get_parent_job_information(context: Context) -> ParentJobInformation | 
None:
+    """
+    Retrieve parent job information from the Airflow context.
+
+    This function extracts OpenLineage parent job details from the task 
instance,
+    which can be used by various integrations (Spark, Glue, etc.) to propagate
+    lineage information to child jobs.
+
+    Args:
+        context: The Airflow context containing task instance information.
+
+    Returns:
+        ParentJobInformation containing namespace, job name, and run IDs
+        for both parent and root parent.
+    """
+    ti = context["ti"]
+    return ParentJobInformation(
+        parent_job_namespace=lineage_job_namespace(),
+        parent_job_name=lineage_job_name(ti),  # type: ignore[arg-type]
+        parent_run_id=lineage_run_id(ti),  # type: ignore[arg-type]
+        root_parent_job_namespace=lineage_root_job_namespace(),
+        root_parent_job_name=lineage_root_job_name(ti),  # type: 
ignore[arg-type]
+        root_parent_run_id=lineage_root_run_id(ti),  # type: ignore[arg-type]
+    )
+
+
 def _get_parent_job_information_as_spark_properties(context: Context) -> dict:
     """
     Retrieve parent job information as Spark properties.
@@ -45,14 +83,14 @@ def 
_get_parent_job_information_as_spark_properties(context: Context) -> dict:
     Returns:
         Spark properties with the parent job information.
     """
-    ti = context["ti"]
+    info = get_parent_job_information(context)
     return {
-        "spark.openlineage.parentJobNamespace": lineage_job_namespace(),
-        "spark.openlineage.parentJobName": lineage_job_name(ti),  # type: 
ignore[arg-type]
-        "spark.openlineage.parentRunId": lineage_run_id(ti),  # type: 
ignore[arg-type]
-        "spark.openlineage.rootParentRunId": lineage_root_run_id(ti),  # type: 
ignore[arg-type]
-        "spark.openlineage.rootParentJobName": lineage_root_job_name(ti),  # 
type: ignore[arg-type]
-        "spark.openlineage.rootParentJobNamespace": lineage_job_namespace(),
+        "spark.openlineage.parentJobNamespace": info.parent_job_namespace,
+        "spark.openlineage.parentJobName": info.parent_job_name,
+        "spark.openlineage.parentRunId": info.parent_run_id,
+        "spark.openlineage.rootParentRunId": info.root_parent_run_id,
+        "spark.openlineage.rootParentJobName": info.root_parent_job_name,
+        "spark.openlineage.rootParentJobNamespace": 
info.root_parent_job_namespace,
     }
 
 
diff --git a/scripts/ci/prek/check_contextmanager_class_decorators.py 
b/scripts/ci/prek/check_contextmanager_class_decorators.py
old mode 100644
new mode 100755

Reply via email to