This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3284b8d4764 fix: Adjust OpenLineage task state check for Airflow 3
(#50380)
3284b8d4764 is described below
commit 3284b8d476408fbb68eafd64c725e5ab23352d36
Author: Kacper Muda <[email protected]>
AuthorDate: Fri May 9 10:26:24 2025 +0200
fix: Adjust OpenLineage task state check for Airflow 3 (#50380)
---
.../snowflake/src/airflow/providers/snowflake/utils/openlineage.py | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git
a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
index b8476d79b06..87afcb11752 100644
--- a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
+++ b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
@@ -301,8 +301,9 @@ def emit_openlineage_events_for_snowflake_queries(
# If real metadata is unavailable, we send events with eventTime=now
default_event_time = timezone.utcnow()
# If no query metadata is provided, we use task_instance's state when
checking for success
- default_state = str(task_instance.state) if hasattr(task_instance,
"state") else ""
+ default_state = task_instance.state.value if hasattr(task_instance,
"state") else ""
+ log.debug("Generating OpenLineage facets")
common_run_facets = {"parent": _get_parent_run_facet(task_instance)}
common_job_facets: dict[str, JobFacet] = {
"jobType": job_type_job.JobTypeJobFacet(