kacpermuda commented on code in PR #59521:
URL: https://github.com/apache/airflow/pull/59521#discussion_r2676561907


##########
providers/openlineage/src/airflow/providers/openlineage/utils/utils.py:
##########
@@ -1000,6 +1003,358 @@ def get_task_duration(ti):
     }
 
 
+def is_dag_run_asset_triggered(
+    dag_run: DagRun,
+):
+    """Return whether the given DAG run was triggered by an asset."""
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.utils.types import DagRunTriggeredByType
+
+        return dag_run.triggered_by == DagRunTriggeredByType.ASSET
+
+    # AF 2 Path
+    from airflow.models.dagrun import DagRunType
+
+    return dag_run.run_type == DagRunType.DATASET_TRIGGERED  # type: 
ignore[attr-defined]  # This attr is available on AF2, but mypy can't see it
+
+
+def build_task_instance_ol_run_id(
+    dag_id: str,
+    task_id: str,
+    try_number: int,
+    logical_date: datetime.datetime,
+    map_index: int,
+):
+    """
+    Generate a deterministic OpenLineage run ID for a task instance.
+
+    Args:
+        dag_id: The DAG identifier.
+        task_id: The task identifier.
+        try_number: The task try number.
+        logical_date: The logical execution date from dagrun.
+        map_index: The task map index.
+
+    Returns:
+        A deterministic OpenLineage run ID for the task instance.
+    """
+    return str(
+        generate_static_uuid(
+            instant=logical_date,
+            
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}.{map_index}".encode(),
+        )
+    )
+
+
+def is_valid_uuid(uuid_string: str | None) -> bool:
+    """Validate that a string is a valid UUID format."""
+    if uuid_string is None:
+        return False
+    try:
+        from uuid import UUID
+
+        UUID(uuid_string)
+        return True
+    except (ValueError, TypeError):
+        return False
+
+
+def build_dag_run_ol_run_id(dag_id: str, logical_date: datetime.datetime, 
clear_number: int) -> str:
+    """
+    Generate a deterministic OpenLineage run ID for a DAG run.
+
+    Args:
+        dag_id: The DAG identifier.
+        logical_date: The logical execution date.
+        clear_number: The DAG run clear number.
+
+    Returns:
+        A deterministic OpenLineage run ID for the DAG run.
+    """
+    return str(
+        generate_static_uuid(
+            instant=logical_date,
+            data=f"{conf.namespace()}.{dag_id}.{clear_number}".encode(),
+        )
+    )
+
+
+def _get_eagerly_loaded_dagrun_consumed_asset_events(dag_id: str, dag_run_id: 
str) -> list[AssetEvent]:
+    """
+    Retrieve consumed asset events for a DagRun with relationships eagerly 
loaded.
+
+    Downstream code accesses source_task_instance, source_dag_run, and asset 
on each AssetEvent.
+    These relationships are lazy-loaded by default, which could cause N+1 
query problem
+    (2 + 3*N queries for N events). Using `joinedload` fetches everything in a 
single query.
+    The returned AssetEvent objects have all needed relationships 
pre-populated in memory,
+    so they can be safely used after the session is closed.
+
+    Returns:
+        AssetEvent objects with populated relationships, or empty list if 
DagRun not found.
+    """
+    # This should only be used on scheduler, so DB access is allowed
+    from sqlalchemy import select
+    from sqlalchemy.orm import joinedload
+
+    from airflow.utils.session import create_session
+
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.models.asset import AssetEvent
+
+        options = (
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.source_dag_run),
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.source_task_instance),
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.asset),
+        )
+
+    else:  # AF2 path
+        from airflow.models.dataset import DatasetEvent
+
+        options = (
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.source_dag_run),
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.source_task_instance),
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.dataset),
+        )
+
+    with create_session() as session:
+        dag_run_with_events = session.scalar(
+            select(DagRun).where(DagRun.dag_id == dag_id).where(DagRun.run_id 
== dag_run_id).options(*options)
+        )
+
+    if not dag_run_with_events:
+        return []
+
+    if AIRFLOW_V_3_0_PLUS:
+        events = dag_run_with_events.consumed_asset_events
+    else:  # AF2 path
+        events = dag_run_with_events.consumed_dataset_events
+
+    return events
+
+
+def _extract_ol_info_from_asset_event(asset_event: AssetEvent) -> dict[str, 
str] | None:
+    """
+    Extract OpenLineage job information from an AssetEvent.
+
+    Information is gathered from multiple potential sources, checked in 
priority
+    order:
+    1. TaskInstance (primary): Provides the most complete and reliable context.
+    2. AssetEvent source fields (fallback): Offers basic `dag_id.task_id` 
metadata.
+    3. `asset_event.extra["openlineage"]` (last resort): May include user 
provided OpenLineage details.
+
+    Args:
+        asset_event: The AssetEvent from which to extract job information.
+
+    Returns:
+        A dictionary containing `job_name`, `job_namespace`, and optionally
+        `run_id`, or `None` if insufficient information is available.
+    """
+    # First check for TaskInstance
+    if ti := asset_event.source_task_instance:
+        result = {
+            "job_name": get_job_name(ti),
+            "job_namespace": conf.namespace(),
+        }
+        source_dr = asset_event.source_dag_run
+        if source_dr:
+            logical_date = source_dr.logical_date  # Get logical date from 
DagRun for OL run_id generation
+            if AIRFLOW_V_3_0_PLUS and logical_date is None:
+                logical_date = source_dr.run_after
+            if logical_date is not None:
+                result["run_id"] = build_task_instance_ol_run_id(
+                    dag_id=ti.dag_id,
+                    task_id=ti.task_id,
+                    try_number=ti.try_number,
+                    logical_date=logical_date,
+                    map_index=ti.map_index,
+                )
+        return result
+
+    # Then, check AssetEvent source_* fields
+    if asset_event.source_dag_id and asset_event.source_task_id:
+        return {
+            "job_name": 
f"{asset_event.source_dag_id}.{asset_event.source_task_id}",
+            "job_namespace": conf.namespace(),
+            # run_id cannot be constructed from these fields alone
+        }

Review Comment:
   Not sure how possible / common is that path, I assume for Airflow produced 
asset events we'll always go into the first one, but just kept it here as a 
backup. I think even this job correlation is worth to have, without specific 
run_id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to