mobuchowski commented on code in PR #62171:
URL: https://github.com/apache/airflow/pull/62171#discussion_r2840955676


##########
providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py:
##########
@@ -193,30 +192,86 @@ def extract_inlets_and_outlets(self, task_metadata: 
OperatorLineage, task) -> No
             if d:
                 task_metadata.outputs.append(d)
 
-    def get_hook_lineage(self) -> tuple[list[Dataset], list[Dataset]] | None:
+    def get_hook_lineage(
+        self,
+        task_instance=None,
+        task_instance_state: TaskInstanceState | None = None,
+    ) -> OperatorLineage | None:
+        """
+        Extract lineage from the Hook Lineage Collector.
+
+        Combines two sources into a single :class:`OperatorLineage`:
+
+        * **Asset-based** inputs/outputs reported via ``add_input_asset`` / 
``add_output_asset``.
+        * **SQL-based** lineage from ``sql_job`` extras reported via
+          
:func:`~airflow.providers.common.sql.hooks.lineage.send_sql_hook_lineage`.
+          When ``task_instance`` is provided, each extra is parsed and 
separate per-query
+          OpenLineage events are emitted.
+
+        Returns ``None`` when nothing was collected.
+        """
         try:
             from airflow.providers.common.compat.lineage.hook import 
get_hook_lineage_collector
+            from airflow.providers.common.sql.hooks.lineage import 
SqlJobHookLineageExtra
         except ImportError:
             return None
 
-        if not hasattr(get_hook_lineage_collector(), "has_collected"):
+        collector = get_hook_lineage_collector()
+        if not hasattr(collector, "has_collected"):
             return None
-        if not get_hook_lineage_collector().has_collected:
+        if not collector.has_collected:
             return None
 
         self.log.debug("OpenLineage will extract lineage from Hook Lineage 
Collector.")
-        return (
-            [
-                asset
-                for asset_info in 
get_hook_lineage_collector().collected_assets.inputs
-                if (asset := translate_airflow_asset(asset_info.asset, 
asset_info.context)) is not None
-            ],
-            [
-                asset
-                for asset_info in 
get_hook_lineage_collector().collected_assets.outputs
-                if (asset := translate_airflow_asset(asset_info.asset, 
asset_info.context)) is not None
-            ],
-        )
+        collected = collector.collected_assets
+
+        # Asset-based inputs/outputs - keep only assets that can be translated 
to OL datasets
+        inputs = [
+            asset
+            for asset_info in collected.inputs
+            if (asset := translate_airflow_asset(asset_info.asset, 
asset_info.context)) is not None
+        ]
+        outputs = [
+            asset
+            for asset_info in collected.outputs
+            if (asset := translate_airflow_asset(asset_info.asset, 
asset_info.context)) is not None
+        ]
+
+        # SQL-based lineage - keep only SQL extra with query_text or job_id.
+        sql_extras = [
+            info
+            for info in collected.extra
+            if info.key == SqlJobHookLineageExtra.KEY.value
+            and (
+                
info.value.get(SqlJobHookLineageExtra.VALUE__SQL_STATEMENT.value)
+                or info.value.get(SqlJobHookLineageExtra.VALUE__JOB_ID.value)
+            )
+        ]
+
+        sql_lineage: OperatorLineage | None = None
+        if sql_extras:
+            from airflow.providers.openlineage.utils.sql_hook_lineage import 
get_lineage_from_sql_extras
+
+            self.log.debug("Found %s sql_job extra(s) in Hook Lineage 
Collector.", len(sql_extras))
+            sql_lineage = get_lineage_from_sql_extras(
+                task_instance=task_instance,
+                sql_extras=sql_extras,
+                is_successful=task_instance_state != TaskInstanceState.FAILED,
+            )
+
+        if not inputs and not outputs and sql_lineage is None:
+            return None
+
+        result: OperatorLineage = OperatorLineage(inputs=inputs, 
outputs=outputs)
+        if sql_lineage:

Review Comment:
   `if sql_lineage is not None`



##########
providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py:
##########
@@ -0,0 +1,242 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Utilities for processing hook-level lineage into OpenLineage events."""
+
+from __future__ import annotations
+
+import datetime as dt
+import logging
+
+from openlineage.client.event_v2 import Job, Run, RunEvent, RunState
+from openlineage.client.facet_v2 import external_query_run, job_type_job, 
sql_job
+from openlineage.client.uuid import generate_new_uuid
+
+from airflow.providers.common.compat.sdk import timezone
+from airflow.providers.common.sql.hooks.lineage import SqlJobHookLineageExtra
+from airflow.providers.openlineage.extractors.base import OperatorLineage
+from airflow.providers.openlineage.plugins.listener import 
get_openlineage_listener
+from airflow.providers.openlineage.plugins.macros import (
+    _get_logical_date,
+    lineage_job_name,
+    lineage_job_namespace,
+    lineage_root_job_name,
+    lineage_root_job_namespace,
+    lineage_root_run_id,
+    lineage_run_id,
+)
+from airflow.providers.openlineage.sqlparser import SQLParser, 
get_openlineage_facets_with_sql
+from airflow.providers.openlineage.utils.utils import _get_parent_run_facet
+
+log = logging.getLogger(__name__)
+
+
+def get_lineage_from_sql_extras(
+    task_instance, sql_extras: list, is_successful: bool = True
+) -> OperatorLineage | None:
+    """
+    Process ``sql_job`` extras and emit per-query OpenLineage events.
+
+    For each extra:
+
+    * Parse SQL via :func:`get_openlineage_facets_with_sql` to obtain inputs,
+      outputs and facets (schema enrichment, column lineage, etc.).
+    * Emit a separate START + COMPLETE/FAIL event pair (child job of the task).
+
+    When there is exactly **one** processed extra, the parsed metadata is also
+    returned so it can be merged into the parent task event.  When there are

Review Comment:
   Do we really want this behavior? It seems to me that this always causes 
pain: you have to handle the same thing twice. Meanwhile situation that always 
creates additional event means you have to just deal with the event once.



##########
providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py:
##########
@@ -126,16 +124,21 @@ def extract_metadata(
                     task.task_id,
                     str(task_metadata),
                 )
-                task_metadata = self.validate_task_metadata(task_metadata)
-                if task_metadata:
-                    if (not task_metadata.inputs) and (not 
task_metadata.outputs):
-                        if (hook_lineage := self.get_hook_lineage()) is not 
None:
-                            inputs, outputs = hook_lineage
-                            task_metadata.inputs = inputs
-                            task_metadata.outputs = outputs
-                        else:
-                            self.extract_inlets_and_outlets(task_metadata, 
task)
-                    return task_metadata
+                task_metadata = self.validate_task_metadata(task_metadata) or 
OperatorLineage()
+                # In no inputs and outputs are present - check Hook Lineage

Review Comment:
   If no



##########
providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py:
##########
@@ -193,30 +192,86 @@ def extract_inlets_and_outlets(self, task_metadata: 
OperatorLineage, task) -> No
             if d:
                 task_metadata.outputs.append(d)
 
-    def get_hook_lineage(self) -> tuple[list[Dataset], list[Dataset]] | None:
+    def get_hook_lineage(
+        self,
+        task_instance=None,
+        task_instance_state: TaskInstanceState | None = None,
+    ) -> OperatorLineage | None:
+        """
+        Extract lineage from the Hook Lineage Collector.
+
+        Combines two sources into a single :class:`OperatorLineage`:
+
+        * **Asset-based** inputs/outputs reported via ``add_input_asset`` / 
``add_output_asset``.
+        * **SQL-based** lineage from ``sql_job`` extras reported via
+          
:func:`~airflow.providers.common.sql.hooks.lineage.send_sql_hook_lineage`.
+          When ``task_instance`` is provided, each extra is parsed and 
separate per-query
+          OpenLineage events are emitted.
+
+        Returns ``None`` when nothing was collected.
+        """
         try:
             from airflow.providers.common.compat.lineage.hook import 
get_hook_lineage_collector
+            from airflow.providers.common.sql.hooks.lineage import 
SqlJobHookLineageExtra
         except ImportError:
             return None
 
-        if not hasattr(get_hook_lineage_collector(), "has_collected"):
+        collector = get_hook_lineage_collector()
+        if not hasattr(collector, "has_collected"):
             return None
-        if not get_hook_lineage_collector().has_collected:
+        if not collector.has_collected:
             return None
 
         self.log.debug("OpenLineage will extract lineage from Hook Lineage 
Collector.")
-        return (
-            [
-                asset
-                for asset_info in 
get_hook_lineage_collector().collected_assets.inputs
-                if (asset := translate_airflow_asset(asset_info.asset, 
asset_info.context)) is not None
-            ],
-            [
-                asset
-                for asset_info in 
get_hook_lineage_collector().collected_assets.outputs
-                if (asset := translate_airflow_asset(asset_info.asset, 
asset_info.context)) is not None
-            ],
-        )
+        collected = collector.collected_assets
+
+        # Asset-based inputs/outputs - keep only assets that can be translated 
to OL datasets
+        inputs = [
+            asset
+            for asset_info in collected.inputs
+            if (asset := translate_airflow_asset(asset_info.asset, 
asset_info.context)) is not None
+        ]
+        outputs = [
+            asset
+            for asset_info in collected.outputs
+            if (asset := translate_airflow_asset(asset_info.asset, 
asset_info.context)) is not None
+        ]
+
+        # SQL-based lineage - keep only SQL extra with query_text or job_id.
+        sql_extras = [
+            info
+            for info in collected.extra
+            if info.key == SqlJobHookLineageExtra.KEY.value
+            and (
+                
info.value.get(SqlJobHookLineageExtra.VALUE__SQL_STATEMENT.value)
+                or info.value.get(SqlJobHookLineageExtra.VALUE__JOB_ID.value)
+            )
+        ]
+
+        sql_lineage: OperatorLineage | None = None
+        if sql_extras:
+            from airflow.providers.openlineage.utils.sql_hook_lineage import 
get_lineage_from_sql_extras
+
+            self.log.debug("Found %s sql_job extra(s) in Hook Lineage 
Collector.", len(sql_extras))
+            sql_lineage = get_lineage_from_sql_extras(
+                task_instance=task_instance,
+                sql_extras=sql_extras,
+                is_successful=task_instance_state != TaskInstanceState.FAILED,
+            )
+
+        if not inputs and not outputs and sql_lineage is None:
+            return None
+
+        result: OperatorLineage = OperatorLineage(inputs=inputs, 
outputs=outputs)
+        if sql_lineage:
+            result.inputs.extend(sql_lineage.inputs)
+            result.outputs.extend(sql_lineage.outputs)
+            for k, v in sql_lineage.run_facets.items():
+                result.run_facets.setdefault(k, v)
+            for k, v in sql_lineage.job_facets.items():
+                result.job_facets.setdefault(k, v)

Review Comment:
   Maybe we should add `OperatorLineage.merge` function - I think this pattern 
is repeated enough 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to