This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f0b51cdacc openlineage: add some debug logging around sql parser call 
sites (#40200)
f0b51cdacc is described below

commit f0b51cdacc6155e4e4495a88109a01decab9e201
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Wed Jun 12 16:50:59 2024 +0200

    openlineage: add some debug logging around sql parser call sites (#40200)
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 airflow/providers/openlineage/sqlparser.py     | 16 ++++++++++++----
 airflow/providers/openlineage/utils/sql.py     |  6 ++++++
 airflow/providers/snowflake/hooks/snowflake.py |  6 +++---
 3 files changed, 21 insertions(+), 7 deletions(-)

diff --git a/airflow/providers/openlineage/sqlparser.py 
b/airflow/providers/openlineage/sqlparser.py
index f181ff8cce..470b93d3cb 100644
--- a/airflow/providers/openlineage/sqlparser.py
+++ b/airflow/providers/openlineage/sqlparser.py
@@ -39,6 +39,7 @@ from airflow.providers.openlineage.utils.sql import (
     get_table_schemas,
 )
 from airflow.typing_compat import TypedDict
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
     from sqlalchemy.engine import Engine
@@ -116,7 +117,7 @@ def from_table_meta(
     return Dataset(namespace=namespace, name=name if not is_uppercase else 
name.upper())
 
 
-class SQLParser:
+class SQLParser(LoggingMixin):
     """Interface for openlineage-sql.
 
     :param dialect: dialect specific to the database
@@ -124,11 +125,18 @@ class SQLParser:
     """
 
     def __init__(self, dialect: str | None = None, default_schema: str | None 
= None) -> None:
+        super().__init__()
         self.dialect = dialect
         self.default_schema = default_schema
 
     def parse(self, sql: list[str] | str) -> SqlMeta | None:
         """Parse a single or a list of SQL statements."""
+        self.log.debug(
+            "OpenLineage calling SQL parser with SQL %s dialect %s schema %s",
+            sql,
+            self.dialect,
+            self.default_schema,
+        )
         return parse(sql=sql, dialect=self.dialect, 
default_schema=self.default_schema)
 
     def parse_table_schemas(
@@ -151,6 +159,7 @@ class SQLParser:
             "database": database or database_info.database,
             "use_flat_cross_db_query": database_info.use_flat_cross_db_query,
         }
+        self.log.info("PRE getting schemas for input and output tables")
         return get_table_schemas(
             hook,
             namespace,
@@ -335,9 +344,8 @@ class SQLParser:
             return split_statement(sql)
         return [obj for stmt in sql for obj in cls.split_sql_string(stmt) if 
obj != ""]
 
-    @classmethod
     def create_information_schema_query(
-        cls,
+        self,
         tables: list[DbTableMeta],
         normalize_name: Callable[[str], str],
         is_cross_db: bool,
@@ -349,7 +357,7 @@ class SQLParser:
         sqlalchemy_engine: Engine | None = None,
     ) -> str:
         """Create SELECT statement to query information schema table."""
-        tables_hierarchy = cls._get_tables_hierarchy(
+        tables_hierarchy = self._get_tables_hierarchy(
             tables,
             normalize_name=normalize_name,
             database=database,
diff --git a/airflow/providers/openlineage/utils/sql.py 
b/airflow/providers/openlineage/utils/sql.py
index f959745b93..f5d083b4e4 100644
--- a/airflow/providers/openlineage/utils/sql.py
+++ b/airflow/providers/openlineage/utils/sql.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import logging
 from collections import defaultdict
 from contextlib import closing
 from enum import IntEnum
@@ -33,6 +34,9 @@ if TYPE_CHECKING:
     from airflow.hooks.base import BaseHook
 
 
+log = logging.getLogger(__name__)
+
+
 class ColumnIndex(IntEnum):
     """Enumerates the indices of columns in information schema view."""
 
@@ -90,6 +94,7 @@ def get_table_schemas(
     if not in_query and not out_query:
         return [], []
 
+    log.debug("Starting to query database for table schemas")
     with closing(hook.get_conn()) as conn, closing(conn.cursor()) as cursor:
         if in_query:
             cursor.execute(in_query)
@@ -101,6 +106,7 @@ def get_table_schemas(
             out_datasets = [x.to_dataset(namespace, database, schema) for x in 
parse_query_result(cursor)]
         else:
             out_datasets = []
+    log.debug("Got table schema query result from database.")
     return in_datasets, out_datasets
 
 
diff --git a/airflow/providers/snowflake/hooks/snowflake.py 
b/airflow/providers/snowflake/hooks/snowflake.py
index 978bcf75e1..e2a4a453fb 100644
--- a/airflow/providers/snowflake/hooks/snowflake.py
+++ b/airflow/providers/snowflake/hooks/snowflake.py
@@ -473,10 +473,10 @@ class SnowflakeHook(DbApiHook):
         from airflow.providers.openlineage.extractors import OperatorLineage
         from airflow.providers.openlineage.sqlparser import SQLParser
 
-        connection = self.get_connection(getattr(self, self.conn_name_attr))
-        namespace = 
SQLParser.create_namespace(self.get_openlineage_database_info(connection))
-
         if self.query_ids:
+            self.log.debug("openlineage: getting connection to get database 
info")
+            connection = self.get_connection(getattr(self, 
self.conn_name_attr))
+            namespace = 
SQLParser.create_namespace(self.get_openlineage_database_info(connection))
             return OperatorLineage(
                 run_facets={
                     "externalQuery": ExternalQueryRunFacet(

Reply via email to