This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1edb5136f29 nit: task-level facets should not overwrite
integration-level facets (#51690)
1edb5136f29 is described below
commit 1edb5136f298112a0e7208dcf0d7c08ea5332992
Author: Kacper Muda <[email protected]>
AuthorDate: Fri Jun 13 15:15:58 2025 +0200
nit: task-level facets should not overwrite integration-level facets
(#51690)
---
.../providers/openlineage/plugins/adapter.py | 24 +++++----
.../tests/unit/openlineage/plugins/test_adapter.py | 63 ++++++++++++++++++++++
2 files changed, 77 insertions(+), 10 deletions(-)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
index fd73a56a917..613a71b4fdc 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
@@ -497,7 +497,9 @@ class OpenLineageAdapter(LoggingMixin):
nominal_end_time: str | None = None,
run_facets: dict[str, RunFacet] | None = None,
) -> Run:
- facets: dict[str, RunFacet] = get_processing_engine_facet() # type:
ignore[assignment]
+ facets: dict[str, RunFacet] = {}
+ if run_facets:
+ facets.update(run_facets)
if nominal_start_time:
facets.update(
{
@@ -508,8 +510,7 @@ class OpenLineageAdapter(LoggingMixin):
)
}
)
- if run_facets:
- facets.update(run_facets)
+ facets.update(get_processing_engine_facet())
return Run(run_id, facets)
@@ -522,11 +523,9 @@ class OpenLineageAdapter(LoggingMixin):
job_tags: list[str] | None = None,
job_facets: dict[str, JobFacet] | None = None,
):
- facets: dict[str, JobFacet] = {
- "jobType": job_type_job.JobTypeJobFacet(
- jobType=job_type, integration="AIRFLOW",
processingType="BATCH", producer=_PRODUCER
- )
- }
+ facets: dict[str, JobFacet] = {}
+ if job_facets:
+ facets.update(job_facets)
if job_description:
facets.update(
{
@@ -560,7 +559,12 @@ class OpenLineageAdapter(LoggingMixin):
)
}
)
- if job_facets:
- facets.update(job_facets)
+ facets.update(
+ {
+ "jobType": job_type_job.JobTypeJobFacet(
+ jobType=job_type, integration="AIRFLOW",
processingType="BATCH", producer=_PRODUCER
+ )
+ }
+ )
return Job(namespace=conf.namespace(), name=job_name, facets=facets)
diff --git
a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
index a1b3ecb4675..f59a9b8d62c 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
@@ -1316,3 +1316,66 @@ def
test_configuration_precedence_when_creating_ol_client():
):
client = OpenLineageAdapter().get_or_create_openlineage_client()
assert client.transport.kind == "console"
+
+
+def test_adapter_build_run():
+ run_id = str(uuid.uuid4())
+ result = OpenLineageAdapter._build_run(
+ run_id=run_id,
+ nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(),
+ nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(),
+ run_facets={
+ "my_custom_facet": external_query_run.ExternalQueryRunFacet(
+ externalQueryId="123", source="source"
+ ),
+ "processing_engine": "this_should_be_gone",
+ },
+ )
+ assert result.runId == run_id
+ assert result.facets == {
+ "my_custom_facet":
external_query_run.ExternalQueryRunFacet(externalQueryId="123",
source="source"),
+ "nominalTime": nominal_time_run.NominalTimeRunFacet(
+ nominalStartTime="2022-01-01T00:00:00",
+ nominalEndTime="2022-01-01T00:00:00",
+ ),
+ "processing_engine": processing_engine_run.ProcessingEngineRunFacet(
+ version=ANY, name="Airflow", openlineageAdapterVersion=ANY
+ ),
+ }
+
+
+def test_adapter_build_job():
+ result = OpenLineageAdapter._build_job(
+ job_name="job_name",
+ job_type="TASK",
+ job_description="job_description",
+ job_owners=["def", "abc"],
+ job_tags=["tag2", "tag1"],
+ job_facets={
+ "my_custom_facet": sql_job.SQLJobFacet(query="sql"),
+ "jobType": "this_should_be_gone",
+ "documentation": "this_should_be_gone",
+ "ownership": "this_should_be_gone",
+ "tags": "this_should_be_gone",
+ },
+ )
+ assert result.name == "job_name"
+ assert result.facets == {
+ "my_custom_facet": sql_job.SQLJobFacet(query="sql"),
+ "documentation":
documentation_job.DocumentationJobFacet(description="job_description"),
+ "ownership": ownership_job.OwnershipJobFacet(
+ owners=[
+ ownership_job.Owner(name="abc", type=None),
+ ownership_job.Owner(name="def", type=None),
+ ]
+ ),
+ "tags": tags_job.TagsJobFacet(
+ tags=[
+ tags_job.TagsJobFacetFields(key="tag1", value="tag1",
source="AIRFLOW"),
+ tags_job.TagsJobFacetFields(key="tag2", value="tag2",
source="AIRFLOW"),
+ ]
+ ),
+ "jobType": job_type_job.JobTypeJobFacet(
+ processingType="BATCH", integration="AIRFLOW", jobType="TASK"
+ ),
+ }