kacpermuda commented on code in PR #47736:
URL: https://github.com/apache/airflow/pull/47736#discussion_r2020887324
##########
providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py:
##########
@@ -78,3 +95,230 @@ def fix_snowflake_sqlalchemy_uri(uri: str) -> str:
hostname = fix_account_name(hostname)
# else - its new hostname, just return it
return urlunparse((parts.scheme, hostname, parts.path, parts.params,
parts.query, parts.fragment))
+
+
+def _get_parent_run_facet(task_instance):
+ """
+ Retrieve the ParentRunFacet associated with a specific Airflow task
instance.
+
+ This facet helps link OpenLineage events of child jobs - such as queries
executed within
+ external systems (e.g., Snowflake) by the Airflow task - to the original
Airflow task execution.
+ Establishing this connection enables better lineage tracking and
observability.
+
+ It's crucial that the task_instance's run_id creation logic matches
OpenLineage's listener implementation.
+ Only then can we ensure that the generated run_id aligns with the Airflow
task,
+ enabling a proper connection between events.
+ """
+ from openlineage.client.facet_v2 import parent_run
+
+ from airflow.providers.openlineage.conf import namespace
+ from airflow.providers.openlineage.plugins.adapter import
OpenLineageAdapter
+
+ def _get_logical_date():
+ # todo: remove when min airflow version >= 3.0
+ if AIRFLOW_V_3_0_PLUS:
+ dagrun = task_instance.get_template_context()["dag_run"]
+ return dagrun.logical_date or dagrun.run_after
+
+ if hasattr(task_instance, "logical_date"):
+ date = task_instance.logical_date
+ else:
+ date = task_instance.execution_date
+
+ return date
+
+ def _get_try_number():
+ # todo: remove when min airflow version >= 2.10.0
+ if AIRFLOW_V_2_10_PLUS:
+ return task_instance.try_number
+ return task_instance.try_number - 1
+
+ # Generate same OL run id as is generated for current task instance
+ parent_run_id = OpenLineageAdapter.build_task_instance_run_id(
+ dag_id=task_instance.dag_id,
+ task_id=task_instance.task_id,
+ logical_date=_get_logical_date(),
+ try_number=_get_try_number(),
+ map_index=task_instance.map_index,
+ )
Review Comment:
Makes sense. For now I've separated this function and added some more tests,
but in a separate PR I'll probably move this logic to OL provider (expose some
function to create parent_run facet or expose som API to emit events, with
possibility to attach parentRunFacet). This way, it's easier to emit OL events
for other providers and we'll have the run_id logic in one place.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]