kacpermuda commented on code in PR #59521:
URL: https://github.com/apache/airflow/pull/59521#discussion_r2676568985


##########
providers/openlineage/src/airflow/providers/openlineage/utils/utils.py:
##########
@@ -1000,6 +1003,358 @@ def get_task_duration(ti):
     }
 
 
+def is_dag_run_asset_triggered(
+    dag_run: DagRun,
+):
+    """Return whether the given DAG run was triggered by an asset."""
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.utils.types import DagRunTriggeredByType
+
+        return dag_run.triggered_by == DagRunTriggeredByType.ASSET
+
+    # AF 2 Path
+    from airflow.models.dagrun import DagRunType
+
+    return dag_run.run_type == DagRunType.DATASET_TRIGGERED  # type: 
ignore[attr-defined]  # This attr is available on AF2, but mypy can't see it
+
+
+def build_task_instance_ol_run_id(
+    dag_id: str,
+    task_id: str,
+    try_number: int,
+    logical_date: datetime.datetime,
+    map_index: int,
+):
+    """
+    Generate a deterministic OpenLineage run ID for a task instance.
+
+    Args:
+        dag_id: The DAG identifier.
+        task_id: The task identifier.
+        try_number: The task try number.
+        logical_date: The logical execution date from dagrun.
+        map_index: The task map index.
+
+    Returns:
+        A deterministic OpenLineage run ID for the task instance.
+    """
+    return str(
+        generate_static_uuid(
+            instant=logical_date,
+            
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}.{map_index}".encode(),
+        )
+    )
+
+
+def is_valid_uuid(uuid_string: str | None) -> bool:
+    """Validate that a string is a valid UUID format."""
+    if uuid_string is None:
+        return False
+    try:
+        from uuid import UUID
+
+        UUID(uuid_string)
+        return True
+    except (ValueError, TypeError):
+        return False
+
+
+def build_dag_run_ol_run_id(dag_id: str, logical_date: datetime.datetime, 
clear_number: int) -> str:
+    """
+    Generate a deterministic OpenLineage run ID for a DAG run.
+
+    Args:
+        dag_id: The DAG identifier.
+        logical_date: The logical execution date.
+        clear_number: The DAG run clear number.
+
+    Returns:
+        A deterministic OpenLineage run ID for the DAG run.
+    """
+    return str(
+        generate_static_uuid(
+            instant=logical_date,
+            data=f"{conf.namespace()}.{dag_id}.{clear_number}".encode(),
+        )
+    )
+
+
+def _get_eagerly_loaded_dagrun_consumed_asset_events(dag_id: str, dag_run_id: 
str) -> list[AssetEvent]:
+    """
+    Retrieve consumed asset events for a DagRun with relationships eagerly 
loaded.
+
+    Downstream code accesses source_task_instance, source_dag_run, and asset 
on each AssetEvent.
+    These relationships are lazy-loaded by default, which could cause N+1 
query problem
+    (2 + 3*N queries for N events). Using `joinedload` fetches everything in a 
single query.
+    The returned AssetEvent objects have all needed relationships 
pre-populated in memory,
+    so they can be safely used after the session is closed.
+
+    Returns:
+        AssetEvent objects with populated relationships, or empty list if 
DagRun not found.
+    """
+    # This should only be used on scheduler, so DB access is allowed
+    from sqlalchemy import select
+    from sqlalchemy.orm import joinedload
+
+    from airflow.utils.session import create_session
+
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.models.asset import AssetEvent
+
+        options = (
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.source_dag_run),
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.source_task_instance),
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.asset),
+        )
+
+    else:  # AF2 path
+        from airflow.models.dataset import DatasetEvent
+
+        options = (
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.source_dag_run),
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.source_task_instance),
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.dataset),
+        )
+
+    with create_session() as session:
+        dag_run_with_events = session.scalar(
+            select(DagRun).where(DagRun.dag_id == dag_id).where(DagRun.run_id 
== dag_run_id).options(*options)
+        )

Review Comment:
   Not sure as well, so I followed the generic path of creating the session. If 
some objections come along I'll adjust the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to