This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6858ea46eb Make schema filter uppercase in `create_filter_clauses`
(#35428)
6858ea46eb is described below
commit 6858ea46eb5282034b0695720d797dcb7ef91100
Author: Jakub Dardzinski <[email protected]>
AuthorDate: Sat Nov 4 00:26:02 2023 +0100
Make schema filter uppercase in `create_filter_clauses` (#35428)
---
airflow/providers/openlineage/utils/sql.py | 1 +
airflow/providers/snowflake/hooks/snowflake.py | 1 +
.../snowflake/operators/test_snowflake_sql.py | 131 ++++++++++++++++++---
3 files changed, 118 insertions(+), 15 deletions(-)
diff --git a/airflow/providers/openlineage/utils/sql.py
b/airflow/providers/openlineage/utils/sql.py
index f0bfbff6f5..7bd6043040 100644
--- a/airflow/providers/openlineage/utils/sql.py
+++ b/airflow/providers/openlineage/utils/sql.py
@@ -193,6 +193,7 @@ def create_filter_clauses(
name.upper() if uppercase_names else name for name in tables
)
if schema:
+ schema = schema.upper() if uppercase_names else schema
filter_clause = and_(information_schema_table.c.table_schema ==
schema, filter_clause)
filter_clauses.append(filter_clause)
return filter_clauses
diff --git a/airflow/providers/snowflake/hooks/snowflake.py
b/airflow/providers/snowflake/hooks/snowflake.py
index ae8acd89f4..f7f7a9a16f 100644
--- a/airflow/providers/snowflake/hooks/snowflake.py
+++ b/airflow/providers/snowflake/hooks/snowflake.py
@@ -440,6 +440,7 @@ class SnowflakeHook(DbApiHook):
"column_name",
"ordinal_position",
"data_type",
+ "table_catalog",
],
database=database,
is_information_schema_cross_db=True,
diff --git a/tests/providers/snowflake/operators/test_snowflake_sql.py
b/tests/providers/snowflake/operators/test_snowflake_sql.py
index 75edeba405..db32bddeb3 100644
--- a/tests/providers/snowflake/operators/test_snowflake_sql.py
+++ b/tests/providers/snowflake/operators/test_snowflake_sql.py
@@ -17,11 +17,18 @@
# under the License.
from __future__ import annotations
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock, call, patch
import pytest
from databricks.sql.types import Row
-from openlineage.client.facet import SchemaDatasetFacet, SchemaField,
SqlJobFacet
+from openlineage.client.facet import (
+ ColumnLineageDatasetFacet,
+ ColumnLineageDatasetFacetFieldsAdditional,
+ ColumnLineageDatasetFacetFieldsAdditionalInputFields,
+ SchemaDatasetFacet,
+ SchemaField,
+ SqlJobFacet,
+)
from openlineage.client.run import Dataset
from airflow.models.connection import Connection
@@ -148,6 +155,9 @@ def test_execute_openlineage_events():
DB_NAME = "DATABASE"
DB_SCHEMA_NAME = "PUBLIC"
+ ANOTHER_DB_NAME = "ANOTHER_DB"
+ ANOTHER_DB_SCHEMA = "ANOTHER_SCHEMA"
+
class SnowflakeHookForTests(SnowflakeHook):
get_conn = MagicMock(name="conn")
get_connection = MagicMock()
@@ -161,17 +171,42 @@ def test_execute_openlineage_events():
def get_db_hook(self):
return dbapi_hook
- sql = """CREATE TABLE IF NOT EXISTS popular_orders_day_of_week (
- order_day_of_week VARCHAR(64) NOT NULL,
- order_placed_on TIMESTAMP NOT NULL,
- orders_placed INTEGER NOT NULL
- );
-FORGOT TO COMMENT"""
+ sql = (
+ "INSERT INTO Test_table\n"
+ "SELECT t1.*, t2.additional_constant FROM
ANOTHER_db.another_schema.popular_orders_day_of_week t1\n"
+ "JOIN little_table t2 ON t1.order_day_of_week =
t2.order_day_of_week;\n"
+ "FORGOT TO COMMENT"
+ )
+
op = SnowflakeOperatorForTest(task_id="snowflake-operator", sql=sql)
rows = [
- (DB_SCHEMA_NAME, "POPULAR_ORDERS_DAY_OF_WEEK", "ORDER_DAY_OF_WEEK", 1,
"TEXT"),
- (DB_SCHEMA_NAME, "POPULAR_ORDERS_DAY_OF_WEEK", "ORDER_PLACED_ON", 2,
"TIMESTAMP_NTZ"),
- (DB_SCHEMA_NAME, "POPULAR_ORDERS_DAY_OF_WEEK", "ORDERS_PLACED", 3,
"NUMBER"),
+ [
+ (
+ ANOTHER_DB_SCHEMA,
+ "POPULAR_ORDERS_DAY_OF_WEEK",
+ "ORDER_DAY_OF_WEEK",
+ 1,
+ "TEXT",
+ ANOTHER_DB_NAME,
+ ),
+ (
+ ANOTHER_DB_SCHEMA,
+ "POPULAR_ORDERS_DAY_OF_WEEK",
+ "ORDER_PLACED_ON",
+ 2,
+ "TIMESTAMP_NTZ",
+ ANOTHER_DB_NAME,
+ ),
+ (ANOTHER_DB_SCHEMA, "POPULAR_ORDERS_DAY_OF_WEEK", "ORDERS_PLACED",
3, "NUMBER", ANOTHER_DB_NAME),
+ (DB_SCHEMA_NAME, "LITTLE_TABLE", "ORDER_DAY_OF_WEEK", 1, "TEXT",
DB_NAME),
+ (DB_SCHEMA_NAME, "LITTLE_TABLE", "ADDITIONAL_CONSTANT", 2, "TEXT",
DB_NAME),
+ ],
+ [
+ (DB_SCHEMA_NAME, "TEST_TABLE", "ORDER_DAY_OF_WEEK", 1, "TEXT",
DB_NAME),
+ (DB_SCHEMA_NAME, "TEST_TABLE", "ORDER_PLACED_ON", 2,
"TIMESTAMP_NTZ", DB_NAME),
+ (DB_SCHEMA_NAME, "TEST_TABLE", "ORDERS_PLACED", 3, "NUMBER",
DB_NAME),
+ (DB_SCHEMA_NAME, "TEST_TABLE", "ADDITIONAL_CONSTANT", 4, "TEXT",
DB_NAME),
+ ],
]
dbapi_hook.get_connection.return_value = Connection(
conn_id="snowflake_default",
@@ -183,14 +218,37 @@ FORGOT TO COMMENT"""
"database": DB_NAME,
},
)
- dbapi_hook.get_conn.return_value.cursor.return_value.fetchall.side_effect
= [rows, []]
+ dbapi_hook.get_conn.return_value.cursor.return_value.fetchall.side_effect
= rows
lineage = op.get_openlineage_facets_on_start()
- assert len(lineage.inputs) == 0
- assert lineage.outputs == [
+ assert
dbapi_hook.get_conn.return_value.cursor.return_value.execute.mock_calls == [
+ call(
+ "SELECT database.information_schema.columns.table_schema,
database.information_schema.columns.table_name, "
+ "database.information_schema.columns.column_name,
database.information_schema.columns.ordinal_position, "
+ "database.information_schema.columns.data_type,
database.information_schema.columns.table_catalog \n"
+ "FROM database.information_schema.columns \n"
+ "WHERE database.information_schema.columns.table_name IN
('LITTLE_TABLE') "
+ "UNION ALL "
+ "SELECT another_db.information_schema.columns.table_schema,
another_db.information_schema.columns.table_name, "
+ "another_db.information_schema.columns.column_name,
another_db.information_schema.columns.ordinal_position, "
+ "another_db.information_schema.columns.data_type,
another_db.information_schema.columns.table_catalog \n"
+ "FROM another_db.information_schema.columns \n"
+ "WHERE another_db.information_schema.columns.table_schema =
'ANOTHER_SCHEMA' "
+ "AND another_db.information_schema.columns.table_name IN
('POPULAR_ORDERS_DAY_OF_WEEK')"
+ ),
+ call(
+ "SELECT database.information_schema.columns.table_schema,
database.information_schema.columns.table_name, "
+ "database.information_schema.columns.column_name,
database.information_schema.columns.ordinal_position, "
+ "database.information_schema.columns.data_type,
database.information_schema.columns.table_catalog \n"
+ "FROM database.information_schema.columns \n"
+ "WHERE database.information_schema.columns.table_name IN
('TEST_TABLE')"
+ ),
+ ]
+
+ assert lineage.inputs == [
Dataset(
namespace="snowflake://test_account.us-east.aws",
- name=f"{DB_NAME}.{DB_SCHEMA_NAME}.POPULAR_ORDERS_DAY_OF_WEEK",
+
name=f"{ANOTHER_DB_NAME}.{ANOTHER_DB_SCHEMA}.POPULAR_ORDERS_DAY_OF_WEEK",
facets={
"schema": SchemaDatasetFacet(
fields=[
@@ -200,6 +258,49 @@ FORGOT TO COMMENT"""
]
)
},
+ ),
+ Dataset(
+ namespace="snowflake://test_account.us-east.aws",
+ name=f"{DB_NAME}.{DB_SCHEMA_NAME}.LITTLE_TABLE",
+ facets={
+ "schema": SchemaDatasetFacet(
+ fields=[
+ SchemaField(name="ORDER_DAY_OF_WEEK", type="TEXT"),
+ SchemaField(name="ADDITIONAL_CONSTANT", type="TEXT"),
+ ]
+ )
+ },
+ ),
+ ]
+ assert lineage.outputs == [
+ Dataset(
+ namespace="snowflake://test_account.us-east.aws",
+ name=f"{DB_NAME}.{DB_SCHEMA_NAME}.TEST_TABLE",
+ facets={
+ "schema": SchemaDatasetFacet(
+ fields=[
+ SchemaField(name="ORDER_DAY_OF_WEEK", type="TEXT"),
+ SchemaField(name="ORDER_PLACED_ON",
type="TIMESTAMP_NTZ"),
+ SchemaField(name="ORDERS_PLACED", type="NUMBER"),
+ SchemaField(name="ADDITIONAL_CONSTANT", type="TEXT"),
+ ]
+ ),
+ "columnLineage": ColumnLineageDatasetFacet(
+ fields={
+ "additional_constant":
ColumnLineageDatasetFacetFieldsAdditional(
+ inputFields=[
+
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+
namespace="snowflake://test_account.us-east.aws",
+ name="DATABASE.PUBLIC.little_table",
+ field="additional_constant",
+ )
+ ],
+ transformationDescription="",
+ transformationType="",
+ )
+ }
+ ),
+ },
)
]