kacpermuda commented on code in PR #47736:
URL: https://github.com/apache/airflow/pull/47736#discussion_r2020887324


##########
providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py:
##########
@@ -78,3 +95,230 @@ def fix_snowflake_sqlalchemy_uri(uri: str) -> str:
     hostname = fix_account_name(hostname)
     # else - its new hostname, just return it
     return urlunparse((parts.scheme, hostname, parts.path, parts.params, 
parts.query, parts.fragment))
+
+
+def _get_parent_run_facet(task_instance):
+    """
+    Retrieve the ParentRunFacet associated with a specific Airflow task 
instance.
+
+    This facet helps link OpenLineage events of child jobs - such as queries 
executed within
+    external systems (e.g., Snowflake) by the Airflow task - to the original 
Airflow task execution.
+    Establishing this connection enables better lineage tracking and 
observability.
+
+    It's crucial that the task_instance's run_id creation logic matches 
OpenLineage's listener implementation.
+    Only then can we ensure that the generated run_id aligns with the Airflow 
task,
+    enabling a proper connection between events.
+    """
+    from openlineage.client.facet_v2 import parent_run
+
+    from airflow.providers.openlineage.conf import namespace
+    from airflow.providers.openlineage.plugins.adapter import 
OpenLineageAdapter
+
+    def _get_logical_date():
+        # todo: remove when min airflow version >= 3.0
+        if AIRFLOW_V_3_0_PLUS:
+            dagrun = task_instance.get_template_context()["dag_run"]
+            return dagrun.logical_date or dagrun.run_after
+
+        if hasattr(task_instance, "logical_date"):
+            date = task_instance.logical_date
+        else:
+            date = task_instance.execution_date
+
+        return date
+
+    def _get_try_number():
+        # todo: remove when min airflow version >= 2.10.0
+        if AIRFLOW_V_2_10_PLUS:
+            return task_instance.try_number
+        return task_instance.try_number - 1
+
+    # Generate same OL run id as is generated for current task instance
+    parent_run_id = OpenLineageAdapter.build_task_instance_run_id(
+        dag_id=task_instance.dag_id,
+        task_id=task_instance.task_id,
+        logical_date=_get_logical_date(),
+        try_number=_get_try_number(),
+        map_index=task_instance.map_index,
+    )

Review Comment:
   Makes sense. For now I've separated this function and added some more tests, 
but in a separate PR I'll probably move this logic to OL provider (expose some 
function to create parent_run facet or expose som API to emit events, with 
possibility to attach parentRunFacet). This way, it's easier to emit OL events 
for other providers and we'll have the run_id logic in one place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to