sekikn opened a new pull request #12505:
URL: https://github.com/apache/airflow/pull/12505


   Currently, users have to specify each file to upload as
   the "s3_keys" parameter when using S3ToSnowflakeOperator.
   But the `COPY INTO` statement, which S3ToSnowflakeOperator
   leverages internally, allows omitting this parameter
   so that users can upload whole files in the specified stage.
   https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#syntax
   
   This PR makes S3ToSnowflakeOperator's s3_keys parameter optional
   so as to support this functionality.
   
   In addition to adding a corresponding test case, I manually ran the revised
   version of S3ToSnowflakeOperator and confirmed that it worked, as follows:
   
   ----
   
   Before executing S3ToSnowflakeOperator, no record is in the target table 
(via Snowflake CLI):
   
   ```
   $ bin/snowsql --accountname *******.ap-southeast-1 --username sekikn 
--dbname FOO --schemaname BAR
   Password: 
   * SnowSQL * v1.2.10
   Type SQL statements or !help
   sekikn#[email protected]>LIST @S3_STG;
   
+-------------------------------------+------+----------------------------------+-------------------------------+
   | name                                | size | md5                           
   | last_modified                 |
   
|-------------------------------------+------+----------------------------------+-------------------------------|
   | s3://****/input/weather-snappy.avro |  330 | 
f1b4996b74a1a0de9e6ffa7d644f97a9 | Fri, 20 Nov 2020 02:42:15 GMT |
   | s3://****/input/weather.avro        |  358 | 
0a1cfdeca549207de10f431c8ed6dd4c | Fri, 20 Nov 2020 01:40:50 GMT |
   
+-------------------------------------+------+----------------------------------+-------------------------------+
   2 Row(s) produced. Time Elapsed: 3.046s
   sekikn#[email protected]>SELECT COUNT(*) FROM WEATHER;
   +----------+                                                                 
   
   | COUNT(*) |
   |----------|
   |        0 |
   +----------+
   1 Row(s) produced. Time Elapsed: 0.161s
   ```
   
   Execute S3ToSnowflakeOperator without s3_keys (via Python REPL):
   
   ```
   In [1]: from airflow.providers.snowflake.transfers.s3_to_snowflake import 
S3ToSnowflakeOperator
   [2020-11-20 19:55:02,997] {arrow_result.pyx:0} INFO - Failed to import 
optional packages, pyarrow
   
   In [2]: t = S3ToSnowflakeOperator(stage="FOO.BAR.S3_STG", 
file_format="(TYPE=AVRO)", schema="FOO.BAR", table="WEATHER", task_id="tid")
   
   In [3]: t.execute(None)
   [2020-11-20 19:55:13,968] {connection.py:210} INFO - Snowflake Connector for 
Python Version: 2.3.6, Python Version: 3.6.9, Platform: 
Linux-5.4.0-53-generic-x86_64-with-Ubuntu-18.04-bionic
   [2020-11-20 19:55:13,968] {connection.py:744} INFO - This connection is in 
OCSP Fail Open Mode. TLS Certificates would be checked for validity and 
revocation status. Any other Certificate Revocation related exceptions or OCSP 
Responder failures would be disregarded in favor of connectivity.
   [2020-11-20 19:55:13,968] {connection.py:760} INFO - Setting 
use_openssl_only mode to False
   [2020-11-20 19:55:14,780] {cursor.py:531} INFO - query: [ALTER SESSION SET 
autocommit=True]
   [2020-11-20 19:55:14,946] {cursor.py:553} INFO - query execution done
   [2020-11-20 19:55:14,946] {dbapi_hook.py:179} INFO - Running statement: 
                   COPY INTO FOO.BAR.WEATHER 
                       FROM @FOO.BAR.S3_STG/
                       
                       file_format=(TYPE=AVRO)
                   
               , parameters: None
   [2020-11-20 19:55:14,946] {cursor.py:531} INFO - query: [COPY INTO 
FOO.BAR.WEATHER FROM @FOO.BAR.S3_STG/  file_format=(TYPE=AVRO)]
   [2020-11-20 19:55:19,840] {cursor.py:553} INFO - query execution done
   [2020-11-20 19:55:19,840] {dbapi_hook.py:185} INFO - Rows affected: 2
   [2020-11-20 19:55:19,840] {connection.py:430} INFO - closed
   ```
   
   And all files are successfully uploaded (via Snowflake CLI):
   
   ```
   sekikn#[email protected]>SELECT COUNT(*) FROM WEATHER;
   +----------+                                                                 
   
   | COUNT(*) |
   |----------|
   |       10 |
   +----------+
   1 Row(s) produced. Time Elapsed: 0.746s
   ```
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to