mobuchowski commented on code in PR #59521:
URL: https://github.com/apache/airflow/pull/59521#discussion_r2676295263


##########
providers/openlineage/src/airflow/providers/openlineage/utils/utils.py:
##########
@@ -1000,6 +1003,358 @@ def get_task_duration(ti):
     }
 
 
+def is_dag_run_asset_triggered(
+    dag_run: DagRun,
+):
+    """Return whether the given DAG run was triggered by an asset."""
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.utils.types import DagRunTriggeredByType
+
+        return dag_run.triggered_by == DagRunTriggeredByType.ASSET
+
+    # AF 2 Path
+    from airflow.models.dagrun import DagRunType
+
+    return dag_run.run_type == DagRunType.DATASET_TRIGGERED  # type: 
ignore[attr-defined]  # This attr is available on AF2, but mypy can't see it
+
+
+def build_task_instance_ol_run_id(
+    dag_id: str,
+    task_id: str,
+    try_number: int,
+    logical_date: datetime.datetime,
+    map_index: int,
+):
+    """
+    Generate a deterministic OpenLineage run ID for a task instance.
+
+    Args:
+        dag_id: The DAG identifier.
+        task_id: The task identifier.
+        try_number: The task try number.
+        logical_date: The logical execution date from dagrun.
+        map_index: The task map index.
+
+    Returns:
+        A deterministic OpenLineage run ID for the task instance.
+    """
+    return str(
+        generate_static_uuid(
+            instant=logical_date,
+            
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}.{map_index}".encode(),
+        )
+    )
+
+
+def is_valid_uuid(uuid_string: str | None) -> bool:
+    """Validate that a string is a valid UUID format."""
+    if uuid_string is None:
+        return False
+    try:
+        from uuid import UUID
+
+        UUID(uuid_string)
+        return True
+    except (ValueError, TypeError):
+        return False
+
+
+def build_dag_run_ol_run_id(dag_id: str, logical_date: datetime.datetime, 
clear_number: int) -> str:
+    """
+    Generate a deterministic OpenLineage run ID for a DAG run.
+
+    Args:
+        dag_id: The DAG identifier.
+        logical_date: The logical execution date.
+        clear_number: The DAG run clear number.
+
+    Returns:
+        A deterministic OpenLineage run ID for the DAG run.
+    """
+    return str(
+        generate_static_uuid(
+            instant=logical_date,
+            data=f"{conf.namespace()}.{dag_id}.{clear_number}".encode(),
+        )
+    )
+
+
+def _get_eagerly_loaded_dagrun_consumed_asset_events(dag_id: str, dag_run_id: 
str) -> list[AssetEvent]:
+    """
+    Retrieve consumed asset events for a DagRun with relationships eagerly 
loaded.
+
+    Downstream code accesses source_task_instance, source_dag_run, and asset 
on each AssetEvent.
+    These relationships are lazy-loaded by default, which could cause N+1 
query problem
+    (2 + 3*N queries for N events). Using `joinedload` fetches everything in a 
single query.
+    The returned AssetEvent objects have all needed relationships 
pre-populated in memory,
+    so they can be safely used after the session is closed.
+
+    Returns:
+        AssetEvent objects with populated relationships, or empty list if 
DagRun not found.
+    """
+    # This should only be used on scheduler, so DB access is allowed
+    from sqlalchemy import select
+    from sqlalchemy.orm import joinedload
+
+    from airflow.utils.session import create_session
+
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.models.asset import AssetEvent
+
+        options = (
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.source_dag_run),
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.source_task_instance),
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.asset),
+        )
+
+    else:  # AF2 path
+        from airflow.models.dataset import DatasetEvent
+
+        options = (
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.source_dag_run),
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.source_task_instance),
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.dataset),
+        )
+
+    with create_session() as session:
+        dag_run_with_events = session.scalar(
+            select(DagRun).where(DagRun.dag_id == dag_id).where(DagRun.run_id 
== dag_run_id).options(*options)
+        )

Review Comment:
   Should we always create session there?
   
   Not sure of the latest patterns wrt that



##########
providers/openlineage/src/airflow/providers/openlineage/utils/utils.py:
##########
@@ -1000,6 +1003,358 @@ def get_task_duration(ti):
     }
 
 
+def is_dag_run_asset_triggered(
+    dag_run: DagRun,
+):
+    """Return whether the given DAG run was triggered by an asset."""
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.utils.types import DagRunTriggeredByType
+
+        return dag_run.triggered_by == DagRunTriggeredByType.ASSET
+
+    # AF 2 Path
+    from airflow.models.dagrun import DagRunType
+
+    return dag_run.run_type == DagRunType.DATASET_TRIGGERED  # type: 
ignore[attr-defined]  # This attr is available on AF2, but mypy can't see it
+
+
+def build_task_instance_ol_run_id(
+    dag_id: str,
+    task_id: str,
+    try_number: int,
+    logical_date: datetime.datetime,
+    map_index: int,
+):
+    """
+    Generate a deterministic OpenLineage run ID for a task instance.
+
+    Args:
+        dag_id: The DAG identifier.
+        task_id: The task identifier.
+        try_number: The task try number.
+        logical_date: The logical execution date from dagrun.
+        map_index: The task map index.
+
+    Returns:
+        A deterministic OpenLineage run ID for the task instance.
+    """
+    return str(
+        generate_static_uuid(
+            instant=logical_date,
+            
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}.{map_index}".encode(),
+        )
+    )
+
+
+def is_valid_uuid(uuid_string: str | None) -> bool:
+    """Validate that a string is a valid UUID format."""
+    if uuid_string is None:
+        return False
+    try:
+        from uuid import UUID
+
+        UUID(uuid_string)
+        return True
+    except (ValueError, TypeError):
+        return False
+
+
+def build_dag_run_ol_run_id(dag_id: str, logical_date: datetime.datetime, 
clear_number: int) -> str:
+    """
+    Generate a deterministic OpenLineage run ID for a DAG run.
+
+    Args:
+        dag_id: The DAG identifier.
+        logical_date: The logical execution date.
+        clear_number: The DAG run clear number.
+
+    Returns:
+        A deterministic OpenLineage run ID for the DAG run.
+    """
+    return str(
+        generate_static_uuid(
+            instant=logical_date,
+            data=f"{conf.namespace()}.{dag_id}.{clear_number}".encode(),
+        )
+    )
+
+
+def _get_eagerly_loaded_dagrun_consumed_asset_events(dag_id: str, dag_run_id: 
str) -> list[AssetEvent]:
+    """
+    Retrieve consumed asset events for a DagRun with relationships eagerly 
loaded.
+
+    Downstream code accesses source_task_instance, source_dag_run, and asset 
on each AssetEvent.
+    These relationships are lazy-loaded by default, which could cause N+1 
query problem
+    (2 + 3*N queries for N events). Using `joinedload` fetches everything in a 
single query.
+    The returned AssetEvent objects have all needed relationships 
pre-populated in memory,
+    so they can be safely used after the session is closed.
+
+    Returns:
+        AssetEvent objects with populated relationships, or empty list if 
DagRun not found.
+    """
+    # This should only be used on scheduler, so DB access is allowed
+    from sqlalchemy import select
+    from sqlalchemy.orm import joinedload
+
+    from airflow.utils.session import create_session
+
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.models.asset import AssetEvent
+
+        options = (
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.source_dag_run),
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.source_task_instance),
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.asset),
+        )
+
+    else:  # AF2 path
+        from airflow.models.dataset import DatasetEvent
+
+        options = (
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.source_dag_run),
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.source_task_instance),
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.dataset),
+        )
+
+    with create_session() as session:
+        dag_run_with_events = session.scalar(
+            select(DagRun).where(DagRun.dag_id == dag_id).where(DagRun.run_id 
== dag_run_id).options(*options)
+        )
+
+    if not dag_run_with_events:
+        return []
+
+    if AIRFLOW_V_3_0_PLUS:
+        events = dag_run_with_events.consumed_asset_events
+    else:  # AF2 path
+        events = dag_run_with_events.consumed_dataset_events
+
+    return events
+
+
+def _extract_ol_info_from_asset_event(asset_event: AssetEvent) -> dict[str, 
str] | None:
+    """
+    Extract OpenLineage job information from an AssetEvent.
+
+    Information is gathered from multiple potential sources, checked in 
priority
+    order:
+    1. TaskInstance (primary): Provides the most complete and reliable context.
+    2. AssetEvent source fields (fallback): Offers basic `dag_id.task_id` 
metadata.
+    3. `asset_event.extra["openlineage"]` (last resort): May include user 
provided OpenLineage details.
+
+    Args:
+        asset_event: The AssetEvent from which to extract job information.
+
+    Returns:
+        A dictionary containing `job_name`, `job_namespace`, and optionally
+        `run_id`, or `None` if insufficient information is available.
+    """
+    # First check for TaskInstance
+    if ti := asset_event.source_task_instance:
+        result = {
+            "job_name": get_job_name(ti),
+            "job_namespace": conf.namespace(),
+        }
+        source_dr = asset_event.source_dag_run
+        if source_dr:
+            logical_date = source_dr.logical_date  # Get logical date from 
DagRun for OL run_id generation
+            if AIRFLOW_V_3_0_PLUS and logical_date is None:
+                logical_date = source_dr.run_after
+            if logical_date is not None:
+                result["run_id"] = build_task_instance_ol_run_id(
+                    dag_id=ti.dag_id,
+                    task_id=ti.task_id,
+                    try_number=ti.try_number,
+                    logical_date=logical_date,
+                    map_index=ti.map_index,
+                )
+        return result
+
+    # Then, check AssetEvent source_* fields
+    if asset_event.source_dag_id and asset_event.source_task_id:
+        return {
+            "job_name": 
f"{asset_event.source_dag_id}.{asset_event.source_task_id}",
+            "job_namespace": conf.namespace(),
+            # run_id cannot be constructed from these fields alone
+        }

Review Comment:
   Is it useful when we don't have run id at all?



##########
providers/openlineage/src/airflow/providers/openlineage/utils/utils.py:
##########
@@ -1000,6 +1003,358 @@ def get_task_duration(ti):
     }
 
 
+def is_dag_run_asset_triggered(
+    dag_run: DagRun,
+):
+    """Return whether the given DAG run was triggered by an asset."""
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.utils.types import DagRunTriggeredByType
+
+        return dag_run.triggered_by == DagRunTriggeredByType.ASSET
+
+    # AF 2 Path
+    from airflow.models.dagrun import DagRunType
+
+    return dag_run.run_type == DagRunType.DATASET_TRIGGERED  # type: 
ignore[attr-defined]  # This attr is available on AF2, but mypy can't see it
+
+
+def build_task_instance_ol_run_id(
+    dag_id: str,
+    task_id: str,
+    try_number: int,
+    logical_date: datetime.datetime,
+    map_index: int,
+):
+    """
+    Generate a deterministic OpenLineage run ID for a task instance.
+
+    Args:
+        dag_id: The DAG identifier.
+        task_id: The task identifier.
+        try_number: The task try number.
+        logical_date: The logical execution date from dagrun.
+        map_index: The task map index.
+
+    Returns:
+        A deterministic OpenLineage run ID for the task instance.
+    """
+    return str(
+        generate_static_uuid(
+            instant=logical_date,
+            
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}.{map_index}".encode(),
+        )
+    )
+
+
+def is_valid_uuid(uuid_string: str | None) -> bool:
+    """Validate that a string is a valid UUID format."""
+    if uuid_string is None:
+        return False
+    try:
+        from uuid import UUID
+
+        UUID(uuid_string)
+        return True
+    except (ValueError, TypeError):
+        return False
+
+
+def build_dag_run_ol_run_id(dag_id: str, logical_date: datetime.datetime, 
clear_number: int) -> str:
+    """
+    Generate a deterministic OpenLineage run ID for a DAG run.
+
+    Args:
+        dag_id: The DAG identifier.
+        logical_date: The logical execution date.
+        clear_number: The DAG run clear number.
+
+    Returns:
+        A deterministic OpenLineage run ID for the DAG run.
+    """
+    return str(
+        generate_static_uuid(
+            instant=logical_date,
+            data=f"{conf.namespace()}.{dag_id}.{clear_number}".encode(),
+        )
+    )
+
+
+def _get_eagerly_loaded_dagrun_consumed_asset_events(dag_id: str, dag_run_id: 
str) -> list[AssetEvent]:
+    """
+    Retrieve consumed asset events for a DagRun with relationships eagerly 
loaded.
+
+    Downstream code accesses source_task_instance, source_dag_run, and asset 
on each AssetEvent.
+    These relationships are lazy-loaded by default, which could cause N+1 
query problem
+    (2 + 3*N queries for N events). Using `joinedload` fetches everything in a 
single query.
+    The returned AssetEvent objects have all needed relationships 
pre-populated in memory,
+    so they can be safely used after the session is closed.
+
+    Returns:
+        AssetEvent objects with populated relationships, or empty list if 
DagRun not found.
+    """
+    # This should only be used on scheduler, so DB access is allowed
+    from sqlalchemy import select
+    from sqlalchemy.orm import joinedload
+
+    from airflow.utils.session import create_session
+
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.models.asset import AssetEvent
+
+        options = (
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.source_dag_run),
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.source_task_instance),
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.asset),
+        )
+
+    else:  # AF2 path
+        from airflow.models.dataset import DatasetEvent
+
+        options = (
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.source_dag_run),
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.source_task_instance),
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.dataset),
+        )
+
+    with create_session() as session:
+        dag_run_with_events = session.scalar(
+            select(DagRun).where(DagRun.dag_id == dag_id).where(DagRun.run_id 
== dag_run_id).options(*options)
+        )
+
+    if not dag_run_with_events:
+        return []
+
+    if AIRFLOW_V_3_0_PLUS:
+        events = dag_run_with_events.consumed_asset_events
+    else:  # AF2 path
+        events = dag_run_with_events.consumed_dataset_events
+
+    return events
+
+
+def _extract_ol_info_from_asset_event(asset_event: AssetEvent) -> dict[str, 
str] | None:
+    """
+    Extract OpenLineage job information from an AssetEvent.
+
+    Information is gathered from multiple potential sources, checked in 
priority
+    order:
+    1. TaskInstance (primary): Provides the most complete and reliable context.
+    2. AssetEvent source fields (fallback): Offers basic `dag_id.task_id` 
metadata.
+    3. `asset_event.extra["openlineage"]` (last resort): May include user 
provided OpenLineage details.
+
+    Args:
+        asset_event: The AssetEvent from which to extract job information.
+
+    Returns:
+        A dictionary containing `job_name`, `job_namespace`, and optionally
+        `run_id`, or `None` if insufficient information is available.
+    """
+    # First check for TaskInstance
+    if ti := asset_event.source_task_instance:
+        result = {
+            "job_name": get_job_name(ti),
+            "job_namespace": conf.namespace(),
+        }
+        source_dr = asset_event.source_dag_run
+        if source_dr:
+            logical_date = source_dr.logical_date  # Get logical date from 
DagRun for OL run_id generation
+            if AIRFLOW_V_3_0_PLUS and logical_date is None:
+                logical_date = source_dr.run_after
+            if logical_date is not None:
+                result["run_id"] = build_task_instance_ol_run_id(
+                    dag_id=ti.dag_id,
+                    task_id=ti.task_id,
+                    try_number=ti.try_number,
+                    logical_date=logical_date,
+                    map_index=ti.map_index,
+                )
+        return result
+
+    # Then, check AssetEvent source_* fields
+    if asset_event.source_dag_id and asset_event.source_task_id:
+        return {
+            "job_name": 
f"{asset_event.source_dag_id}.{asset_event.source_task_id}",
+            "job_namespace": conf.namespace(),
+            # run_id cannot be constructed from these fields alone
+        }
+
+    # Lastly, check asset_event.extra["openlineage"]
+    if asset_event.extra:
+        ol_info_from_extra = asset_event.extra.get("openlineage")
+        if isinstance(ol_info_from_extra, dict):
+            job_name = ol_info_from_extra.get("parentJobName")
+            job_namespace = ol_info_from_extra.get("parentJobNamespace")
+            run_id = ol_info_from_extra.get("parentRunId")
+
+            if job_name and job_namespace:
+                result = {
+                    "job_name": str(job_name),
+                    "job_namespace": str(job_namespace),
+                }
+                if run_id:
+                    if not is_valid_uuid(str(run_id)):
+                        log.warning(
+                            "Invalid runId in AssetEvent.extra; ignoring 
value. event_id=%s, run_id=%s",
+                            asset_event.id,
+                            run_id,
+                        )
+                    else:
+                        result["run_id"] = str(run_id)
+                return result
+    return None
+
+
+def _get_ol_job_dependencies_from_asset_events(events: list[AssetEvent]) -> 
list[dict[str, Any]]:
+    """
+    Extract and deduplicate OpenLineage job dependencies from asset events.
+
+    This function processes a list of asset events, extracts OpenLineage 
dependency information
+    from all relevant sources, and deduplicates the results based on the tuple 
(job_namespace, job_name, run_id)
+    to prevent emitting duplicate dependencies. Multiple asset events from the 
same job but different
+    source runs/assets are aggregated into a single dependency entry with all 
source information preserved.
+
+    Args:
+        events: List of AssetEvent objects to process.
+
+    Returns:
+        A list of deduplicated dictionaries containing OpenLineage job 
dependency information.
+        Each dictionary includes job_name, job_namespace, optional run_id, and 
an asset_events
+        list containing source information from all aggregated events.
+    """
+    # Use a dictionary keyed by (namespace, job_name, run_id) to deduplicate
+    # Multiple asset events from the same task instance should only create one 
dependency
+    deduplicated_jobs: dict[tuple[str, str, str | None], dict[str, Any]] = {}
+
+    for asset_event in events:
+        # Extract OpenLineage information
+        ol_info = _extract_ol_info_from_asset_event(asset_event)
+
+        # Skip if we don't have minimum required info (job_name and namespace)
+        if not ol_info:
+            log.debug(
+                "Insufficient OpenLineage information, skipping asset event: 
%s",
+                str(asset_event),
+            )
+            continue
+
+        # Create deduplication key: (namespace, job_name, run_id)
+        # We deduplicate on job identity (namespace + name + run_id), not on 
source dag_run_id
+        # Multiple asset events from the same job but different source 
runs/assets are aggregated
+        dedup_key = (
+            ol_info["job_namespace"],
+            ol_info["job_name"],
+            ol_info.get("run_id"),
+        )

Review Comment:
   nice :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to