This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f0b51cdacc openlineage: add some debug logging around sql parser call
sites (#40200)
f0b51cdacc is described below
commit f0b51cdacc6155e4e4495a88109a01decab9e201
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Wed Jun 12 16:50:59 2024 +0200
openlineage: add some debug logging around sql parser call sites (#40200)
Signed-off-by: Maciej Obuchowski <[email protected]>
---
airflow/providers/openlineage/sqlparser.py | 16 ++++++++++++----
airflow/providers/openlineage/utils/sql.py | 6 ++++++
airflow/providers/snowflake/hooks/snowflake.py | 6 +++---
3 files changed, 21 insertions(+), 7 deletions(-)
diff --git a/airflow/providers/openlineage/sqlparser.py
b/airflow/providers/openlineage/sqlparser.py
index f181ff8cce..470b93d3cb 100644
--- a/airflow/providers/openlineage/sqlparser.py
+++ b/airflow/providers/openlineage/sqlparser.py
@@ -39,6 +39,7 @@ from airflow.providers.openlineage.utils.sql import (
get_table_schemas,
)
from airflow.typing_compat import TypedDict
+from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
from sqlalchemy.engine import Engine
@@ -116,7 +117,7 @@ def from_table_meta(
return Dataset(namespace=namespace, name=name if not is_uppercase else
name.upper())
-class SQLParser:
+class SQLParser(LoggingMixin):
"""Interface for openlineage-sql.
:param dialect: dialect specific to the database
@@ -124,11 +125,18 @@ class SQLParser:
"""
def __init__(self, dialect: str | None = None, default_schema: str | None
= None) -> None:
+ super().__init__()
self.dialect = dialect
self.default_schema = default_schema
def parse(self, sql: list[str] | str) -> SqlMeta | None:
"""Parse a single or a list of SQL statements."""
+ self.log.debug(
+ "OpenLineage calling SQL parser with SQL %s dialect %s schema %s",
+ sql,
+ self.dialect,
+ self.default_schema,
+ )
return parse(sql=sql, dialect=self.dialect,
default_schema=self.default_schema)
def parse_table_schemas(
@@ -151,6 +159,7 @@ class SQLParser:
"database": database or database_info.database,
"use_flat_cross_db_query": database_info.use_flat_cross_db_query,
}
+ self.log.info("PRE getting schemas for input and output tables")
return get_table_schemas(
hook,
namespace,
@@ -335,9 +344,8 @@ class SQLParser:
return split_statement(sql)
return [obj for stmt in sql for obj in cls.split_sql_string(stmt) if
obj != ""]
- @classmethod
def create_information_schema_query(
- cls,
+ self,
tables: list[DbTableMeta],
normalize_name: Callable[[str], str],
is_cross_db: bool,
@@ -349,7 +357,7 @@ class SQLParser:
sqlalchemy_engine: Engine | None = None,
) -> str:
"""Create SELECT statement to query information schema table."""
- tables_hierarchy = cls._get_tables_hierarchy(
+ tables_hierarchy = self._get_tables_hierarchy(
tables,
normalize_name=normalize_name,
database=database,
diff --git a/airflow/providers/openlineage/utils/sql.py
b/airflow/providers/openlineage/utils/sql.py
index f959745b93..f5d083b4e4 100644
--- a/airflow/providers/openlineage/utils/sql.py
+++ b/airflow/providers/openlineage/utils/sql.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import logging
from collections import defaultdict
from contextlib import closing
from enum import IntEnum
@@ -33,6 +34,9 @@ if TYPE_CHECKING:
from airflow.hooks.base import BaseHook
+log = logging.getLogger(__name__)
+
+
class ColumnIndex(IntEnum):
"""Enumerates the indices of columns in information schema view."""
@@ -90,6 +94,7 @@ def get_table_schemas(
if not in_query and not out_query:
return [], []
+ log.debug("Starting to query database for table schemas")
with closing(hook.get_conn()) as conn, closing(conn.cursor()) as cursor:
if in_query:
cursor.execute(in_query)
@@ -101,6 +106,7 @@ def get_table_schemas(
out_datasets = [x.to_dataset(namespace, database, schema) for x in
parse_query_result(cursor)]
else:
out_datasets = []
+ log.debug("Got table schema query result from database.")
return in_datasets, out_datasets
diff --git a/airflow/providers/snowflake/hooks/snowflake.py
b/airflow/providers/snowflake/hooks/snowflake.py
index 978bcf75e1..e2a4a453fb 100644
--- a/airflow/providers/snowflake/hooks/snowflake.py
+++ b/airflow/providers/snowflake/hooks/snowflake.py
@@ -473,10 +473,10 @@ class SnowflakeHook(DbApiHook):
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.sqlparser import SQLParser
- connection = self.get_connection(getattr(self, self.conn_name_attr))
- namespace =
SQLParser.create_namespace(self.get_openlineage_database_info(connection))
-
if self.query_ids:
+ self.log.debug("openlineage: getting connection to get database
info")
+ connection = self.get_connection(getattr(self,
self.conn_name_attr))
+ namespace =
SQLParser.create_namespace(self.get_openlineage_database_info(connection))
return OperatorLineage(
run_facets={
"externalQuery": ExternalQueryRunFacet(