kacpermuda commented on code in PR #62171: URL: https://github.com/apache/airflow/pull/62171#discussion_r2846058273
########## providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py: ########## @@ -0,0 +1,242 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Utilities for processing hook-level lineage into OpenLineage events.""" + +from __future__ import annotations + +import datetime as dt +import logging + +from openlineage.client.event_v2 import Job, Run, RunEvent, RunState +from openlineage.client.facet_v2 import external_query_run, job_type_job, sql_job +from openlineage.client.uuid import generate_new_uuid + +from airflow.providers.common.compat.sdk import timezone +from airflow.providers.common.sql.hooks.lineage import SqlJobHookLineageExtra +from airflow.providers.openlineage.extractors.base import OperatorLineage +from airflow.providers.openlineage.plugins.listener import get_openlineage_listener +from airflow.providers.openlineage.plugins.macros import ( + _get_logical_date, + lineage_job_name, + lineage_job_namespace, + lineage_root_job_name, + lineage_root_job_namespace, + lineage_root_run_id, + lineage_run_id, +) +from airflow.providers.openlineage.sqlparser import SQLParser, get_openlineage_facets_with_sql +from airflow.providers.openlineage.utils.utils import _get_parent_run_facet + +log = logging.getLogger(__name__) + + +def get_lineage_from_sql_extras( + task_instance, sql_extras: list, is_successful: bool = True +) -> OperatorLineage | None: + """ + Process ``sql_job`` extras and emit per-query OpenLineage events. + + For each extra: + + * Parse SQL via :func:`get_openlineage_facets_with_sql` to obtain inputs, + outputs and facets (schema enrichment, column lineage, etc.). + * Emit a separate START + COMPLETE/FAIL event pair (child job of the task). + + When there is exactly **one** processed extra, the parsed metadata is also + returned so it can be merged into the parent task event. When there are Review Comment: Adjusted, nothing is returned now for the SQL lineage, only separate events are emitted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
