This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6858ea46eb Make schema filter uppercase in `create_filter_clauses` 
(#35428)
6858ea46eb is described below

commit 6858ea46eb5282034b0695720d797dcb7ef91100
Author: Jakub Dardzinski <[email protected]>
AuthorDate: Sat Nov 4 00:26:02 2023 +0100

    Make schema filter uppercase in `create_filter_clauses` (#35428)
---
 airflow/providers/openlineage/utils/sql.py         |   1 +
 airflow/providers/snowflake/hooks/snowflake.py     |   1 +
 .../snowflake/operators/test_snowflake_sql.py      | 131 ++++++++++++++++++---
 3 files changed, 118 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/openlineage/utils/sql.py 
b/airflow/providers/openlineage/utils/sql.py
index f0bfbff6f5..7bd6043040 100644
--- a/airflow/providers/openlineage/utils/sql.py
+++ b/airflow/providers/openlineage/utils/sql.py
@@ -193,6 +193,7 @@ def create_filter_clauses(
             name.upper() if uppercase_names else name for name in tables
         )
         if schema:
+            schema = schema.upper() if uppercase_names else schema
             filter_clause = and_(information_schema_table.c.table_schema == 
schema, filter_clause)
         filter_clauses.append(filter_clause)
     return filter_clauses
diff --git a/airflow/providers/snowflake/hooks/snowflake.py 
b/airflow/providers/snowflake/hooks/snowflake.py
index ae8acd89f4..f7f7a9a16f 100644
--- a/airflow/providers/snowflake/hooks/snowflake.py
+++ b/airflow/providers/snowflake/hooks/snowflake.py
@@ -440,6 +440,7 @@ class SnowflakeHook(DbApiHook):
                 "column_name",
                 "ordinal_position",
                 "data_type",
+                "table_catalog",
             ],
             database=database,
             is_information_schema_cross_db=True,
diff --git a/tests/providers/snowflake/operators/test_snowflake_sql.py 
b/tests/providers/snowflake/operators/test_snowflake_sql.py
index 75edeba405..db32bddeb3 100644
--- a/tests/providers/snowflake/operators/test_snowflake_sql.py
+++ b/tests/providers/snowflake/operators/test_snowflake_sql.py
@@ -17,11 +17,18 @@
 # under the License.
 from __future__ import annotations
 
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock, call, patch
 
 import pytest
 from databricks.sql.types import Row
-from openlineage.client.facet import SchemaDatasetFacet, SchemaField, 
SqlJobFacet
+from openlineage.client.facet import (
+    ColumnLineageDatasetFacet,
+    ColumnLineageDatasetFacetFieldsAdditional,
+    ColumnLineageDatasetFacetFieldsAdditionalInputFields,
+    SchemaDatasetFacet,
+    SchemaField,
+    SqlJobFacet,
+)
 from openlineage.client.run import Dataset
 
 from airflow.models.connection import Connection
@@ -148,6 +155,9 @@ def test_execute_openlineage_events():
     DB_NAME = "DATABASE"
     DB_SCHEMA_NAME = "PUBLIC"
 
+    ANOTHER_DB_NAME = "ANOTHER_DB"
+    ANOTHER_DB_SCHEMA = "ANOTHER_SCHEMA"
+
     class SnowflakeHookForTests(SnowflakeHook):
         get_conn = MagicMock(name="conn")
         get_connection = MagicMock()
@@ -161,17 +171,42 @@ def test_execute_openlineage_events():
         def get_db_hook(self):
             return dbapi_hook
 
-    sql = """CREATE TABLE IF NOT EXISTS popular_orders_day_of_week (
-        order_day_of_week VARCHAR(64) NOT NULL,
-        order_placed_on   TIMESTAMP NOT NULL,
-        orders_placed     INTEGER NOT NULL
-    );
-FORGOT TO COMMENT"""
+    sql = (
+        "INSERT INTO Test_table\n"
+        "SELECT t1.*, t2.additional_constant FROM 
ANOTHER_db.another_schema.popular_orders_day_of_week t1\n"
+        "JOIN little_table t2 ON t1.order_day_of_week = 
t2.order_day_of_week;\n"
+        "FORGOT TO COMMENT"
+    )
+
     op = SnowflakeOperatorForTest(task_id="snowflake-operator", sql=sql)
     rows = [
-        (DB_SCHEMA_NAME, "POPULAR_ORDERS_DAY_OF_WEEK", "ORDER_DAY_OF_WEEK", 1, 
"TEXT"),
-        (DB_SCHEMA_NAME, "POPULAR_ORDERS_DAY_OF_WEEK", "ORDER_PLACED_ON", 2, 
"TIMESTAMP_NTZ"),
-        (DB_SCHEMA_NAME, "POPULAR_ORDERS_DAY_OF_WEEK", "ORDERS_PLACED", 3, 
"NUMBER"),
+        [
+            (
+                ANOTHER_DB_SCHEMA,
+                "POPULAR_ORDERS_DAY_OF_WEEK",
+                "ORDER_DAY_OF_WEEK",
+                1,
+                "TEXT",
+                ANOTHER_DB_NAME,
+            ),
+            (
+                ANOTHER_DB_SCHEMA,
+                "POPULAR_ORDERS_DAY_OF_WEEK",
+                "ORDER_PLACED_ON",
+                2,
+                "TIMESTAMP_NTZ",
+                ANOTHER_DB_NAME,
+            ),
+            (ANOTHER_DB_SCHEMA, "POPULAR_ORDERS_DAY_OF_WEEK", "ORDERS_PLACED", 
3, "NUMBER", ANOTHER_DB_NAME),
+            (DB_SCHEMA_NAME, "LITTLE_TABLE", "ORDER_DAY_OF_WEEK", 1, "TEXT", 
DB_NAME),
+            (DB_SCHEMA_NAME, "LITTLE_TABLE", "ADDITIONAL_CONSTANT", 2, "TEXT", 
DB_NAME),
+        ],
+        [
+            (DB_SCHEMA_NAME, "TEST_TABLE", "ORDER_DAY_OF_WEEK", 1, "TEXT", 
DB_NAME),
+            (DB_SCHEMA_NAME, "TEST_TABLE", "ORDER_PLACED_ON", 2, 
"TIMESTAMP_NTZ", DB_NAME),
+            (DB_SCHEMA_NAME, "TEST_TABLE", "ORDERS_PLACED", 3, "NUMBER", 
DB_NAME),
+            (DB_SCHEMA_NAME, "TEST_TABLE", "ADDITIONAL_CONSTANT", 4, "TEXT", 
DB_NAME),
+        ],
     ]
     dbapi_hook.get_connection.return_value = Connection(
         conn_id="snowflake_default",
@@ -183,14 +218,37 @@ FORGOT TO COMMENT"""
             "database": DB_NAME,
         },
     )
-    dbapi_hook.get_conn.return_value.cursor.return_value.fetchall.side_effect 
= [rows, []]
+    dbapi_hook.get_conn.return_value.cursor.return_value.fetchall.side_effect 
= rows
 
     lineage = op.get_openlineage_facets_on_start()
-    assert len(lineage.inputs) == 0
-    assert lineage.outputs == [
+    assert 
dbapi_hook.get_conn.return_value.cursor.return_value.execute.mock_calls == [
+        call(
+            "SELECT database.information_schema.columns.table_schema, 
database.information_schema.columns.table_name, "
+            "database.information_schema.columns.column_name, 
database.information_schema.columns.ordinal_position, "
+            "database.information_schema.columns.data_type, 
database.information_schema.columns.table_catalog \n"
+            "FROM database.information_schema.columns \n"
+            "WHERE database.information_schema.columns.table_name IN 
('LITTLE_TABLE') "
+            "UNION ALL "
+            "SELECT another_db.information_schema.columns.table_schema, 
another_db.information_schema.columns.table_name, "
+            "another_db.information_schema.columns.column_name, 
another_db.information_schema.columns.ordinal_position, "
+            "another_db.information_schema.columns.data_type, 
another_db.information_schema.columns.table_catalog \n"
+            "FROM another_db.information_schema.columns \n"
+            "WHERE another_db.information_schema.columns.table_schema = 
'ANOTHER_SCHEMA' "
+            "AND another_db.information_schema.columns.table_name IN 
('POPULAR_ORDERS_DAY_OF_WEEK')"
+        ),
+        call(
+            "SELECT database.information_schema.columns.table_schema, 
database.information_schema.columns.table_name, "
+            "database.information_schema.columns.column_name, 
database.information_schema.columns.ordinal_position, "
+            "database.information_schema.columns.data_type, 
database.information_schema.columns.table_catalog \n"
+            "FROM database.information_schema.columns \n"
+            "WHERE database.information_schema.columns.table_name IN 
('TEST_TABLE')"
+        ),
+    ]
+
+    assert lineage.inputs == [
         Dataset(
             namespace="snowflake://test_account.us-east.aws",
-            name=f"{DB_NAME}.{DB_SCHEMA_NAME}.POPULAR_ORDERS_DAY_OF_WEEK",
+            
name=f"{ANOTHER_DB_NAME}.{ANOTHER_DB_SCHEMA}.POPULAR_ORDERS_DAY_OF_WEEK",
             facets={
                 "schema": SchemaDatasetFacet(
                     fields=[
@@ -200,6 +258,49 @@ FORGOT TO COMMENT"""
                     ]
                 )
             },
+        ),
+        Dataset(
+            namespace="snowflake://test_account.us-east.aws",
+            name=f"{DB_NAME}.{DB_SCHEMA_NAME}.LITTLE_TABLE",
+            facets={
+                "schema": SchemaDatasetFacet(
+                    fields=[
+                        SchemaField(name="ORDER_DAY_OF_WEEK", type="TEXT"),
+                        SchemaField(name="ADDITIONAL_CONSTANT", type="TEXT"),
+                    ]
+                )
+            },
+        ),
+    ]
+    assert lineage.outputs == [
+        Dataset(
+            namespace="snowflake://test_account.us-east.aws",
+            name=f"{DB_NAME}.{DB_SCHEMA_NAME}.TEST_TABLE",
+            facets={
+                "schema": SchemaDatasetFacet(
+                    fields=[
+                        SchemaField(name="ORDER_DAY_OF_WEEK", type="TEXT"),
+                        SchemaField(name="ORDER_PLACED_ON", 
type="TIMESTAMP_NTZ"),
+                        SchemaField(name="ORDERS_PLACED", type="NUMBER"),
+                        SchemaField(name="ADDITIONAL_CONSTANT", type="TEXT"),
+                    ]
+                ),
+                "columnLineage": ColumnLineageDatasetFacet(
+                    fields={
+                        "additional_constant": 
ColumnLineageDatasetFacetFieldsAdditional(
+                            inputFields=[
+                                
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+                                    
namespace="snowflake://test_account.us-east.aws",
+                                    name="DATABASE.PUBLIC.little_table",
+                                    field="additional_constant",
+                                )
+                            ],
+                            transformationDescription="",
+                            transformationType="",
+                        )
+                    }
+                ),
+            },
         )
     ]
 

Reply via email to