This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1edb5136f29 nit: task-level facets should not overwrite 
integration-level facets (#51690)
1edb5136f29 is described below

commit 1edb5136f298112a0e7208dcf0d7c08ea5332992
Author: Kacper Muda <[email protected]>
AuthorDate: Fri Jun 13 15:15:58 2025 +0200

    nit: task-level facets should not overwrite integration-level facets 
(#51690)
---
 .../providers/openlineage/plugins/adapter.py       | 24 +++++----
 .../tests/unit/openlineage/plugins/test_adapter.py | 63 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 10 deletions(-)

diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
index fd73a56a917..613a71b4fdc 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
@@ -497,7 +497,9 @@ class OpenLineageAdapter(LoggingMixin):
         nominal_end_time: str | None = None,
         run_facets: dict[str, RunFacet] | None = None,
     ) -> Run:
-        facets: dict[str, RunFacet] = get_processing_engine_facet()  # type: 
ignore[assignment]
+        facets: dict[str, RunFacet] = {}
+        if run_facets:
+            facets.update(run_facets)
         if nominal_start_time:
             facets.update(
                 {
@@ -508,8 +510,7 @@ class OpenLineageAdapter(LoggingMixin):
                     )
                 }
             )
-        if run_facets:
-            facets.update(run_facets)
+        facets.update(get_processing_engine_facet())
 
         return Run(run_id, facets)
 
@@ -522,11 +523,9 @@ class OpenLineageAdapter(LoggingMixin):
         job_tags: list[str] | None = None,
         job_facets: dict[str, JobFacet] | None = None,
     ):
-        facets: dict[str, JobFacet] = {
-            "jobType": job_type_job.JobTypeJobFacet(
-                jobType=job_type, integration="AIRFLOW", 
processingType="BATCH", producer=_PRODUCER
-            )
-        }
+        facets: dict[str, JobFacet] = {}
+        if job_facets:
+            facets.update(job_facets)
         if job_description:
             facets.update(
                 {
@@ -560,7 +559,12 @@ class OpenLineageAdapter(LoggingMixin):
                     )
                 }
             )
-        if job_facets:
-            facets.update(job_facets)
+        facets.update(
+            {
+                "jobType": job_type_job.JobTypeJobFacet(
+                    jobType=job_type, integration="AIRFLOW", 
processingType="BATCH", producer=_PRODUCER
+                )
+            }
+        )
 
         return Job(namespace=conf.namespace(), name=job_name, facets=facets)
diff --git 
a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
index a1b3ecb4675..f59a9b8d62c 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
@@ -1316,3 +1316,66 @@ def 
test_configuration_precedence_when_creating_ol_client():
         ):
             client = OpenLineageAdapter().get_or_create_openlineage_client()
             assert client.transport.kind == "console"
+
+
+def test_adapter_build_run():
+    run_id = str(uuid.uuid4())
+    result = OpenLineageAdapter._build_run(
+        run_id=run_id,
+        nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(),
+        nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(),
+        run_facets={
+            "my_custom_facet": external_query_run.ExternalQueryRunFacet(
+                externalQueryId="123", source="source"
+            ),
+            "processing_engine": "this_should_be_gone",
+        },
+    )
+    assert result.runId == run_id
+    assert result.facets == {
+        "my_custom_facet": 
external_query_run.ExternalQueryRunFacet(externalQueryId="123", 
source="source"),
+        "nominalTime": nominal_time_run.NominalTimeRunFacet(
+            nominalStartTime="2022-01-01T00:00:00",
+            nominalEndTime="2022-01-01T00:00:00",
+        ),
+        "processing_engine": processing_engine_run.ProcessingEngineRunFacet(
+            version=ANY, name="Airflow", openlineageAdapterVersion=ANY
+        ),
+    }
+
+
+def test_adapter_build_job():
+    result = OpenLineageAdapter._build_job(
+        job_name="job_name",
+        job_type="TASK",
+        job_description="job_description",
+        job_owners=["def", "abc"],
+        job_tags=["tag2", "tag1"],
+        job_facets={
+            "my_custom_facet": sql_job.SQLJobFacet(query="sql"),
+            "jobType": "this_should_be_gone",
+            "documentation": "this_should_be_gone",
+            "ownership": "this_should_be_gone",
+            "tags": "this_should_be_gone",
+        },
+    )
+    assert result.name == "job_name"
+    assert result.facets == {
+        "my_custom_facet": sql_job.SQLJobFacet(query="sql"),
+        "documentation": 
documentation_job.DocumentationJobFacet(description="job_description"),
+        "ownership": ownership_job.OwnershipJobFacet(
+            owners=[
+                ownership_job.Owner(name="abc", type=None),
+                ownership_job.Owner(name="def", type=None),
+            ]
+        ),
+        "tags": tags_job.TagsJobFacet(
+            tags=[
+                tags_job.TagsJobFacetFields(key="tag1", value="tag1", 
source="AIRFLOW"),
+                tags_job.TagsJobFacetFields(key="tag2", value="tag2", 
source="AIRFLOW"),
+            ]
+        ),
+        "jobType": job_type_job.JobTypeJobFacet(
+            processingType="BATCH", integration="AIRFLOW", jobType="TASK"
+        ),
+    }

Reply via email to