sekikn opened a new pull request #12505: URL: https://github.com/apache/airflow/pull/12505
Currently, users have to specify each file to upload as the "s3_keys" parameter when using S3ToSnowflakeOperator. But the `COPY INTO` statement, which S3ToSnowflakeOperator leverages internally, allows omitting this parameter so that users can upload whole files in the specified stage. https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#syntax This PR makes S3ToSnowflakeOperator's s3_keys parameter optional so as to support this functionality. In addition to adding a corresponding test case, I manually ran the revised version of S3ToSnowflakeOperator and confirmed that it worked, as follows: ---- Before executing S3ToSnowflakeOperator, no record is in the target table (via Snowflake CLI): ``` $ bin/snowsql --accountname *******.ap-southeast-1 --username sekikn --dbname FOO --schemaname BAR Password: * SnowSQL * v1.2.10 Type SQL statements or !help sekikn#[email protected]>LIST @S3_STG; +-------------------------------------+------+----------------------------------+-------------------------------+ | name | size | md5 | last_modified | |-------------------------------------+------+----------------------------------+-------------------------------| | s3://****/input/weather-snappy.avro | 330 | f1b4996b74a1a0de9e6ffa7d644f97a9 | Fri, 20 Nov 2020 02:42:15 GMT | | s3://****/input/weather.avro | 358 | 0a1cfdeca549207de10f431c8ed6dd4c | Fri, 20 Nov 2020 01:40:50 GMT | +-------------------------------------+------+----------------------------------+-------------------------------+ 2 Row(s) produced. Time Elapsed: 3.046s sekikn#[email protected]>SELECT COUNT(*) FROM WEATHER; +----------+ | COUNT(*) | |----------| | 0 | +----------+ 1 Row(s) produced. Time Elapsed: 0.161s ``` Execute S3ToSnowflakeOperator without s3_keys (via Python REPL): ``` In [1]: from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator [2020-11-20 19:55:02,997] {arrow_result.pyx:0} INFO - Failed to import optional packages, pyarrow In [2]: t = S3ToSnowflakeOperator(stage="FOO.BAR.S3_STG", file_format="(TYPE=AVRO)", schema="FOO.BAR", table="WEATHER", task_id="tid") In [3]: t.execute(None) [2020-11-20 19:55:13,968] {connection.py:210} INFO - Snowflake Connector for Python Version: 2.3.6, Python Version: 3.6.9, Platform: Linux-5.4.0-53-generic-x86_64-with-Ubuntu-18.04-bionic [2020-11-20 19:55:13,968] {connection.py:744} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity. [2020-11-20 19:55:13,968] {connection.py:760} INFO - Setting use_openssl_only mode to False [2020-11-20 19:55:14,780] {cursor.py:531} INFO - query: [ALTER SESSION SET autocommit=True] [2020-11-20 19:55:14,946] {cursor.py:553} INFO - query execution done [2020-11-20 19:55:14,946] {dbapi_hook.py:179} INFO - Running statement: COPY INTO FOO.BAR.WEATHER FROM @FOO.BAR.S3_STG/ file_format=(TYPE=AVRO) , parameters: None [2020-11-20 19:55:14,946] {cursor.py:531} INFO - query: [COPY INTO FOO.BAR.WEATHER FROM @FOO.BAR.S3_STG/ file_format=(TYPE=AVRO)] [2020-11-20 19:55:19,840] {cursor.py:553} INFO - query execution done [2020-11-20 19:55:19,840] {dbapi_hook.py:185} INFO - Rows affected: 2 [2020-11-20 19:55:19,840] {connection.py:430} INFO - closed ``` And all files are successfully uploaded (via Snowflake CLI): ``` sekikn#[email protected]>SELECT COUNT(*) FROM WEATHER; +----------+ | COUNT(*) | |----------| | 10 | +----------+ 1 Row(s) produced. Time Elapsed: 0.746s ``` <!-- Thank you for contributing! Please make sure that your code changes are covered with tests. And in case of new features or big changes remember to adjust the documentation. Feel free to ping committers for the review! In case of existing issue, reference it using one of the following: closes: #ISSUE related: #ISSUE How to write a good git commit message: http://chris.beams.io/posts/git-commit/ --> --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information. In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
