This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch glue-pass-params-v2 in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 758753706b88fb891e5a43c6702f65756f1f85d6 Author: Maciej Obuchowski <[email protected]> AuthorDate: Fri Dec 5 13:30:17 2025 +0100 glue pass openlineage params Signed-off-by: Maciej Obuchowski <[email protected]> --- .../airflow/providers/amazon/aws/operators/glue.py | 10 ++ .../providers/amazon/aws/utils/openlineage.py | 74 +++++++++++ .../unit/amazon/aws/utils/test_openlineage.py | 144 +++++++++++++++++++++ .../common/compat/openlineage/utils/spark.py | 6 + .../airflow/providers/openlineage/utils/spark.py | 54 ++++++-- .../prek/check_contextmanager_class_decorators.py | 0 6 files changed, 280 insertions(+), 8 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py index 536121de679..d6dbb9ccc8a 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py @@ -36,6 +36,9 @@ from airflow.providers.amazon.aws.triggers.glue import ( ) from airflow.providers.amazon.aws.utils import validate_execute_complete_event from airflow.providers.amazon.aws.utils.mixins import aws_template_fields +from airflow.providers.amazon.aws.utils.openlineage import ( + inject_parent_job_information_into_glue_script_args, +) from airflow.providers.common.compat.sdk import AirflowException if TYPE_CHECKING: @@ -139,10 +142,14 @@ class GlueJobOperator(AwsBaseOperator[GlueJobHook]): job_poll_interval: int | float = 6, waiter_delay: int = 60, waiter_max_attempts: int = 75, + openlineage_inject_parent_job_info: bool = conf.getboolean( + "openlineage", "spark_inject_parent_job_info", fallback=False + ), **kwargs, ): super().__init__(**kwargs) self.job_name = job_name + self._openlineage_inject_parent_job_info = openlineage_inject_parent_job_info self.job_desc = job_desc self.script_location = script_location self.concurrent_run_limit = concurrent_run_limit or 1 @@ -217,6 +224,9 @@ class GlueJobOperator(AwsBaseOperator[GlueJobHook]): :return: the current Glue job ID. """ + if self._openlineage_inject_parent_job_info: + self.log.debug("Injecting OpenLineage parent job information into Glue script_args.") + self.script_args = inject_parent_job_information_into_glue_script_args(self.script_args, context) self.log.info( "Initializing AWS Glue Job: %s. Wait for completion: %s", self.job_name, diff --git a/providers/amazon/src/airflow/providers/amazon/aws/utils/openlineage.py b/providers/amazon/src/airflow/providers/amazon/aws/utils/openlineage.py index be5703e2f6e..65946ef2869 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/utils/openlineage.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/utils/openlineage.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import logging from typing import TYPE_CHECKING, Any from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook @@ -28,9 +29,13 @@ from airflow.providers.common.compat.openlineage.facet import ( SchemaDatasetFacet, SchemaDatasetFacetFields, ) +from airflow.providers.common.compat.openlineage.utils.spark import get_parent_job_information if TYPE_CHECKING: from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook + from airflow.utils.context import Context + +log = logging.getLogger(__name__) def get_facets_from_redshift_table( @@ -136,3 +141,72 @@ def get_identity_column_lineage_facet( } ) return column_lineage_facet + + +def inject_parent_job_information_into_glue_script_args( + script_args: dict[str, Any], context: Context +) -> dict[str, Any]: + """ + Inject OpenLineage parent job info into Glue script_args via --conf. + + The parent job information is injected as Spark configuration properties via + the ``--conf`` argument. This uses the same ``spark.openlineage.*`` properties + that the OpenLineage Spark integration recognizes. + + Note: We use ``--conf`` instead of ``--customer-driver-env-vars`` because AWS Glue + requires all environment variable keys to have a ``CUSTOMER_`` prefix, which would + not be recognized by the OpenLineage Spark integration. + + AWS Glue ``--conf`` format requires multiple properties to be formatted as:: + + spark.prop1=val1 --conf spark.prop2=val2 --conf spark.prop3=val3 + + See: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + + Behavior: + - If OpenLineage provider is not available, returns script_args unchanged + - If user already set any ``spark.openlineage.parent*`` properties in ``--conf``, + skips injection to preserve user-provided values + - Appends to existing ``--conf`` if present (with ``--conf`` prefix) + - Returns new dict (does not mutate original) + + Args: + script_args: The Glue job's script_args dict. + context: Airflow task context. + + Returns: + Modified script_args with OpenLineage Spark properties injected in ``--conf``. + """ + info = get_parent_job_information(context) + if info is None: + return script_args + + existing_conf = script_args.get("--conf", "") + + # Skip if user already configured OpenLineage parent properties + if "spark.openlineage.parent" in existing_conf: + log.debug( + "OpenLineage parent job properties already present in --conf. " + "Skipping injection to preserve user-provided values." + ) + return script_args + + ol_spark_properties = [ + f"spark.openlineage.parentJobNamespace={info.parent_job_namespace}", + f"spark.openlineage.parentJobName={info.parent_job_name}", + f"spark.openlineage.parentRunId={info.parent_run_id}", + f"spark.openlineage.rootParentJobNamespace={info.root_parent_job_namespace}", + f"spark.openlineage.rootParentJobName={info.root_parent_job_name}", + f"spark.openlineage.rootParentRunId={info.root_parent_run_id}", + ] + + # AWS Glue --conf format: "prop1=val1 --conf prop2=val2 --conf prop3=val3" + # First property has no --conf prefix, subsequent ones do + new_conf = " --conf ".join(ol_spark_properties) + if existing_conf: + new_conf = f"{existing_conf} --conf {new_conf}" + + new_script_args = {**script_args} + new_script_args["--conf"] = new_conf + + return new_script_args diff --git a/providers/amazon/tests/unit/amazon/aws/utils/test_openlineage.py b/providers/amazon/tests/unit/amazon/aws/utils/test_openlineage.py index 3790a590238..ac58420bafc 100644 --- a/providers/amazon/tests/unit/amazon/aws/utils/test_openlineage.py +++ b/providers/amazon/tests/unit/amazon/aws/utils/test_openlineage.py @@ -17,6 +17,7 @@ from __future__ import annotations +from datetime import datetime from unittest import mock import pytest @@ -26,6 +27,7 @@ from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook from airflow.providers.amazon.aws.utils.openlineage import ( get_facets_from_redshift_table, get_identity_column_lineage_facet, + inject_parent_job_information_into_glue_script_args, ) from airflow.providers.common.compat.openlineage.facet import ( ColumnLineageDatasetFacet, @@ -34,6 +36,17 @@ from airflow.providers.common.compat.openlineage.facet import ( InputField, ) +EXAMPLE_CONTEXT = { + "ti": mock.MagicMock( + dag_id="dag_id", + task_id="task_id", + try_number=1, + map_index=-1, + logical_date=datetime(2024, 11, 11), + dag_run=mock.MagicMock(logical_date=datetime(2024, 11, 11), clear_number=0), + ) +} + @mock.patch("airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook.get_records") def test_get_facets_from_redshift_table_sql_hook(mock_get_records): @@ -168,3 +181,134 @@ def test_get_identity_column_lineage_facet_no_input_datasets(): ValueError, match="When providing `field_names` You must provide at least one `input_dataset`." ): get_identity_column_lineage_facet(field_names=field_names, input_datasets=input_datasets) + + +class TestInjectParentJobInformationIntoGlueScriptArgs: + """Tests for injecting OpenLineage parent job info via --conf Spark properties.""" + + @mock.patch( + "airflow.providers.amazon.aws.utils.openlineage.get_parent_job_information", + return_value=None, + ) + def test_skips_when_openlineage_not_available(self, mock_get_parent_info): + """Returns unchanged script_args when OpenLineage is not available.""" + result = inject_parent_job_information_into_glue_script_args({}, EXAMPLE_CONTEXT) + assert result == {} + + @mock.patch("airflow.providers.amazon.aws.utils.openlineage.get_parent_job_information") + def test_injects_into_empty_script_args(self, mock_get_parent_info): + """Injects --conf with Spark OpenLineage properties into empty script_args.""" + from airflow.providers.openlineage.utils.spark import ParentJobInformation + + mock_get_parent_info.return_value = ParentJobInformation( + parent_job_namespace="default", + parent_job_name="dag_id.task_id", + parent_run_id="uuid-123", + root_parent_job_namespace="default", + root_parent_job_name="dag_id", + root_parent_run_id="uuid-456", + ) + + result = inject_parent_job_information_into_glue_script_args({}, EXAMPLE_CONTEXT) + + assert "--conf" in result + conf = result["--conf"] + # Verify AWS Glue --conf format: "prop1=val1 --conf prop2=val2 --conf ..." + assert "spark.openlineage.parentJobNamespace=default" in conf + assert "spark.openlineage.parentJobName=dag_id.task_id" in conf + assert "spark.openlineage.parentRunId=uuid-123" in conf + assert "spark.openlineage.rootParentJobNamespace=default" in conf + assert "spark.openlineage.rootParentJobName=dag_id" in conf + assert "spark.openlineage.rootParentRunId=uuid-456" in conf + # Verify the format uses " --conf " as separator between properties + assert " --conf " in conf + + @mock.patch("airflow.providers.amazon.aws.utils.openlineage.get_parent_job_information") + def test_appends_to_existing_conf(self, mock_get_parent_info): + """Appends OpenLineage properties to existing --conf value with --conf prefix.""" + from airflow.providers.openlineage.utils.spark import ParentJobInformation + + mock_get_parent_info.return_value = ParentJobInformation( + parent_job_namespace="default", + parent_job_name="dag.task", + parent_run_id="uuid-123", + root_parent_job_namespace="default", + root_parent_job_name="dag", + root_parent_run_id="uuid-456", + ) + + existing = {"--conf": "spark.executor.memory=4g --conf spark.driver.memory=2g"} + result = inject_parent_job_information_into_glue_script_args(existing, EXAMPLE_CONTEXT) + + conf = result["--conf"] + # Original conf preserved at the beginning + assert conf.startswith("spark.executor.memory=4g --conf spark.driver.memory=2g") + # OpenLineage properties added with --conf prefix + assert " --conf spark.openlineage.parentJobName=dag.task" in conf + + @mock.patch("airflow.providers.amazon.aws.utils.openlineage.get_parent_job_information") + def test_skips_when_ol_properties_already_in_conf(self, mock_get_parent_info): + """Skips injection if user already set spark.openlineage.parent* properties.""" + from airflow.providers.openlineage.utils.spark import ParentJobInformation + + mock_get_parent_info.return_value = ParentJobInformation( + parent_job_namespace="default", + parent_job_name="dag.task", + parent_run_id="uuid-123", + root_parent_job_namespace="default", + root_parent_job_name="dag", + root_parent_run_id="uuid-456", + ) + + # User already set their own parent job info + existing = {"--conf": "spark.openlineage.parentJobName=custom_job"} + result = inject_parent_job_information_into_glue_script_args(existing, EXAMPLE_CONTEXT) + + # Should return unchanged - user's values preserved + assert result == existing + + @mock.patch("airflow.providers.amazon.aws.utils.openlineage.get_parent_job_information") + def test_preserves_other_script_args(self, mock_get_parent_info): + """Preserves other script_args while adding --conf.""" + from airflow.providers.openlineage.utils.spark import ParentJobInformation + + mock_get_parent_info.return_value = ParentJobInformation( + parent_job_namespace="default", + parent_job_name="dag.task", + parent_run_id="uuid-123", + root_parent_job_namespace="default", + root_parent_job_name="dag", + root_parent_run_id="uuid-456", + ) + + existing = {"--input": "s3://bucket/input", "--output": "s3://bucket/output"} + result = inject_parent_job_information_into_glue_script_args(existing, EXAMPLE_CONTEXT) + + assert result["--input"] == "s3://bucket/input" + assert result["--output"] == "s3://bucket/output" + assert "--conf" in result + + @mock.patch("airflow.providers.amazon.aws.utils.openlineage.get_parent_job_information") + def test_does_not_mutate_original_script_args(self, mock_get_parent_info): + """Ensures original script_args dict is not mutated.""" + from airflow.providers.openlineage.utils.spark import ParentJobInformation + + mock_get_parent_info.return_value = ParentJobInformation( + parent_job_namespace="default", + parent_job_name="dag.task", + parent_run_id="uuid-123", + root_parent_job_namespace="default", + root_parent_job_name="dag", + root_parent_run_id="uuid-456", + ) + + original = {"--input": "s3://bucket/input"} + original_copy = dict(original) + result = inject_parent_job_information_into_glue_script_args(original, EXAMPLE_CONTEXT) + + # Original unchanged + assert original == original_copy + # Result is different object + assert result is not original + assert "--conf" in result + assert "--conf" not in original diff --git a/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py b/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py index 1028bf3debf..414acce2081 100644 --- a/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py +++ b/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py @@ -24,12 +24,14 @@ log = logging.getLogger(__name__) if TYPE_CHECKING: from airflow.providers.openlineage.utils.spark import ( + get_parent_job_information, inject_parent_job_information_into_spark_properties, inject_transport_information_into_spark_properties, ) from airflow.sdk import Context try: from airflow.providers.openlineage.utils.spark import ( + get_parent_job_information, inject_parent_job_information_into_spark_properties, inject_transport_information_into_spark_properties, ) @@ -49,8 +51,12 @@ except ImportError: ) return properties + def get_parent_job_information(context: Context) -> None: + return None + __all__ = [ "inject_parent_job_information_into_spark_properties", "inject_transport_information_into_spark_properties", + "get_parent_job_information", ] diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py index a92ac25eab2..438789e0c9c 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py @@ -18,13 +18,14 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, NamedTuple from airflow.providers.openlineage.plugins.listener import get_openlineage_listener from airflow.providers.openlineage.plugins.macros import ( lineage_job_name, lineage_job_namespace, lineage_root_job_name, + lineage_root_job_namespace, lineage_root_run_id, lineage_run_id, ) @@ -35,6 +36,43 @@ if TYPE_CHECKING: log = logging.getLogger(__name__) +class ParentJobInformation(NamedTuple): + """Container for OpenLineage parent job information.""" + + parent_job_namespace: str + parent_job_name: str + parent_run_id: str + root_parent_job_namespace: str + root_parent_job_name: str + root_parent_run_id: str + + +def get_parent_job_information(context: Context) -> ParentJobInformation | None: + """ + Retrieve parent job information from the Airflow context. + + This function extracts OpenLineage parent job details from the task instance, + which can be used by various integrations (Spark, Glue, etc.) to propagate + lineage information to child jobs. + + Args: + context: The Airflow context containing task instance information. + + Returns: + ParentJobInformation containing namespace, job name, and run IDs + for both parent and root parent. + """ + ti = context["ti"] + return ParentJobInformation( + parent_job_namespace=lineage_job_namespace(), + parent_job_name=lineage_job_name(ti), # type: ignore[arg-type] + parent_run_id=lineage_run_id(ti), # type: ignore[arg-type] + root_parent_job_namespace=lineage_root_job_namespace(), + root_parent_job_name=lineage_root_job_name(ti), # type: ignore[arg-type] + root_parent_run_id=lineage_root_run_id(ti), # type: ignore[arg-type] + ) + + def _get_parent_job_information_as_spark_properties(context: Context) -> dict: """ Retrieve parent job information as Spark properties. @@ -45,14 +83,14 @@ def _get_parent_job_information_as_spark_properties(context: Context) -> dict: Returns: Spark properties with the parent job information. """ - ti = context["ti"] + info = get_parent_job_information(context) return { - "spark.openlineage.parentJobNamespace": lineage_job_namespace(), - "spark.openlineage.parentJobName": lineage_job_name(ti), # type: ignore[arg-type] - "spark.openlineage.parentRunId": lineage_run_id(ti), # type: ignore[arg-type] - "spark.openlineage.rootParentRunId": lineage_root_run_id(ti), # type: ignore[arg-type] - "spark.openlineage.rootParentJobName": lineage_root_job_name(ti), # type: ignore[arg-type] - "spark.openlineage.rootParentJobNamespace": lineage_job_namespace(), + "spark.openlineage.parentJobNamespace": info.parent_job_namespace, + "spark.openlineage.parentJobName": info.parent_job_name, + "spark.openlineage.parentRunId": info.parent_run_id, + "spark.openlineage.rootParentRunId": info.root_parent_run_id, + "spark.openlineage.rootParentJobName": info.root_parent_job_name, + "spark.openlineage.rootParentJobNamespace": info.root_parent_job_namespace, } diff --git a/scripts/ci/prek/check_contextmanager_class_decorators.py b/scripts/ci/prek/check_contextmanager_class_decorators.py old mode 100644 new mode 100755
