This is an automated email from the ASF dual-hosted git repository.
milton0825 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new a546a10 Add Snowflake system test (#8422)
a546a10 is described below
commit a546a10b13b1f7a119071d8d2001cb17ccdcbbf7
Author: Daniel Huang <[email protected]>
AuthorDate: Sat May 16 14:04:12 2020 -0700
Add Snowflake system test (#8422)
---
airflow/providers/snowflake/BACKPORT_README.md | 47 +++++++++++++
.../providers/snowflake/example_dags/__init__.py | 16 +++++
.../snowflake/example_dags/example_snowflake.py | 81 ++++++++++++++++++++++
.../snowflake/operators/s3_to_snowflake.py | 2 +-
.../snowflake/operators/test_snowflake_system.py | 70 +++++++++++++++++++
5 files changed, 215 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/snowflake/BACKPORT_README.md
b/airflow/providers/snowflake/BACKPORT_README.md
new file mode 100644
index 0000000..d453119
--- /dev/null
+++ b/airflow/providers/snowflake/BACKPORT_README.md
@@ -0,0 +1,47 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+## Changelog
+
+<!-- START doctoc generated TOC please keep comment here to allow auto update
-->
+<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
+
+
+- [v2020.XX.XX](#v2020xxxx)
+ - [New operators](#new-operators)
+ - [Updated operators](#updated-operators)
+
+<!-- END doctoc generated TOC please keep comment here to allow auto update -->
+
+### v2020.XX.XX
+
+This is the first released version of the package.
+
+#### New operators
+
+This release includes one new operator, `S3ToSnowflakeTransfer`.
+
+#### Updated operators
+
+The operators in Airflow 2.0 have been moved to a new package. The following
table showing operators
+from Airflow 1.10 and its equivalent from Airflow 2.0:
+
+| Airflow 1.10 (`airflow.contrib.operators` package)
| Airflow 2.0 (`airflow.providers.snowflake.operators` package)
|
+|------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------|
+| snowflake_operator.SnowflakeOperator
| snowflake.SnowflakeOperator
|
diff --git a/airflow/providers/snowflake/example_dags/__init__.py
b/airflow/providers/snowflake/example_dags/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/snowflake/example_dags/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/snowflake/example_dags/example_snowflake.py
b/airflow/providers/snowflake/example_dags/example_snowflake.py
new file mode 100644
index 0000000..69ef187
--- /dev/null
+++ b/airflow/providers/snowflake/example_dags/example_snowflake.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example use of Snowflake related operators.
+"""
+import os
+
+from airflow import DAG
+from airflow.providers.snowflake.operators.s3_to_snowflake import
S3ToSnowflakeTransfer
+from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
+from airflow.utils.dates import days_ago
+
+SNOWFLAKE_CONN_ID = os.environ.get('SNOWFLAKE_CONN_ID', 'snowflake_default')
+# TODO: should be able to rely on connection's schema, but currently param
required by S3ToSnowflakeTransfer
+SNOWFLAKE_SCHEMA = os.environ.get('SNOWFLAKE_SCHEMA', 'public')
+SNOWFLAKE_STAGE = os.environ.get('SNOWFLAKE_STAGE', 'airflow')
+SNOWFLAKE_SAMPLE_TABLE = os.environ.get('SNOWFLAKE_SAMPLE_TABLE',
'snowflake_sample_data.tpch_sf001.orders')
+SNOWFLAKE_LOAD_TABLE = os.environ.get('SNOWFLAKE_LOAD_TABLE',
'airflow_example')
+SNOWFLAKE_LOAD_JSON_PATH = os.environ.get('SNOWFLAKE_LOAD_PATH',
'example.json')
+
+default_args = {
+ 'owner': 'airflow',
+ 'start_date': days_ago(2),
+}
+
+dag = DAG(
+ 'example_snowflake',
+ default_args=default_args,
+ tags=['example'],
+)
+
+select = SnowflakeOperator(
+ task_id='select',
+ snowflake_conn_id=SNOWFLAKE_CONN_ID,
+ sql="""
+ SELECT *
+ FROM {0}
+ LIMIT 100;
+ """.format(SNOWFLAKE_SAMPLE_TABLE),
+ dag=dag,
+)
+
+create_table = SnowflakeOperator(
+ task_id='create_table',
+ snowflake_conn_id='snowflake_conn_id',
+ sql="""
+ CREATE TRANSIENT TABLE IF NOT EXISTS {0} (
+ data VARIANT
+ );
+ """.format(SNOWFLAKE_LOAD_TABLE),
+ schema=SNOWFLAKE_SCHEMA,
+ dag=dag,
+)
+
+copy_into_table = S3ToSnowflakeTransfer(
+ task_id='copy_into_table',
+ snowflake_conn_id=SNOWFLAKE_CONN_ID,
+ s3_keys=[SNOWFLAKE_LOAD_JSON_PATH],
+ table=SNOWFLAKE_LOAD_TABLE,
+ schema=SNOWFLAKE_SCHEMA,
+ stage=SNOWFLAKE_STAGE,
+ file_format="(type = 'JSON')",
+ dag=dag,
+)
+
+create_table >> copy_into_table
diff --git a/airflow/providers/snowflake/operators/s3_to_snowflake.py
b/airflow/providers/snowflake/operators/s3_to_snowflake.py
index 973f9ba..165c5dc 100644
--- a/airflow/providers/snowflake/operators/s3_to_snowflake.py
+++ b/airflow/providers/snowflake/operators/s3_to_snowflake.py
@@ -51,7 +51,7 @@ class S3ToSnowflakeTransfer(BaseOperator):
table,
stage,
file_format,
- schema,
+ schema, # TODO: shouldn't be required, rely on session/user
defaults
columns_array=None,
autocommit=True,
snowflake_conn_id='snowflake_default',
diff --git a/tests/providers/snowflake/operators/test_snowflake_system.py
b/tests/providers/snowflake/operators/test_snowflake_system.py
new file mode 100644
index 0000000..29a7c98
--- /dev/null
+++ b/tests/providers/snowflake/operators/test_snowflake_system.py
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+import os
+
+import pytest
+
+from airflow.models import Connection
+from airflow.utils import db
+from tests.test_utils import AIRFLOW_MAIN_FOLDER
+from tests.test_utils.system_tests_class import SystemTest
+
+CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR',
'/files/airflow-breeze-config/keys')
+SNOWFLAKE_KEY = 'snowflake.json'
+SNOWFLAKE_CREDENTIALS_PATH = os.path.join(CREDENTIALS_DIR, SNOWFLAKE_KEY)
+SNOWFLAKE_DAG_FOLDER = os.path.join(
+ AIRFLOW_MAIN_FOLDER, 'airflow', 'providers', 'snowflake', 'example_dags')
+
+
[email protected]_file(SNOWFLAKE_KEY)
[email protected]('snowflake')
+class SnowflakeExampleDagsSystemTest(SystemTest):
+
+ def setUp(self):
+ super().setUp()
+
+ if os.environ.get('RUN_AIRFLOW_1_10') == 'true':
+ with open(SNOWFLAKE_CREDENTIALS_PATH) as f:
+ # Example:
+ # {
+ # "account": "foo",
+ # "region": "us-west-2",
+ # "user": "airflow",
+ # "password": "secret",
+ # "warehouse": "shared",
+ # "database": "test",
+ # "schema": "public",
+ # "role": "airflow"
+ # }
+ credentials = json.load(f)
+
+ extra = {
+ 'account': credentials['account'],
+ 'region': credentials['region'],
+ 'role': credentials['role'],
+ 'warehouse': credentials['warehouse'],
+ 'database': credentials['database'],
+ }
+ conn = Connection(conn_id='snowflake_conn_id',
login=credentials['user'],
+ password=credentials['password'],
schema=credentials['schema'],
+ extra=json.dumps(extra))
+ db.merge_conn(conn)
+
+ def test_dag_example(self):
+ self.run_dag('example_snowflake', SNOWFLAKE_DAG_FOLDER)