sekikn commented on a change in pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#discussion_r554579260
##########
File path: airflow/providers/snowflake/transfers/s3_to_snowflake.py
##########
@@ -71,16 +71,14 @@ def __init__(
def execute(self, context: Any) -> None:
snowflake_hook =
SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
- # Snowflake won't accept list of files it has to be tuple only.
- # but in python tuple([1]) = (1,) => which is invalid for snowflake
- files = str(self.s3_keys)
- files = files.replace('[', '(')
- files = files.replace(']', ')')
+ files = ""
+ if self.s3_keys:
+ files = "files=({})".format(", ".join(f"'{key}'" for key in
self.s3_keys))
# we can extend this based on stage
base_sql = """
FROM @{stage}/
Review comment:
Thanks! My first thought was that users can specify prefix within the
"stage" variable, but your design is cleaner actually. Just added it.
I manually confirmed that parameter worked, as follows:
Given an example S3 bucket with the following structure (each file contains
five records),
```
$ aws s3 ls --recursive s3://****/input
2020-11-20 10:40:25 0 input/
2021-01-10 23:04:49 0 input/subdir/
2021-01-10 23:39:35 335 input/subdir/weather-sorted.avro
2020-11-20 11:42:15 330 input/weather-snappy.avro
2020-11-20 10:40:50 358 input/weather.avro
```
and the following stage and target table:
```
sekikn#[email protected]>DESC STAGE S3_STG;
+--------------------+---------------------+---------------+------------------------------------------------+------------------+
| parent_property | property | property_type | property_value
| property_default |
|--------------------+---------------------+---------------+------------------------------------------------+------------------|
(snip)
| STAGE_COPY_OPTIONS | FORCE | Boolean | true
| false |
| STAGE_LOCATION | URL | String |
["s3://****/input/"] | |
| STAGE_INTEGRATION | STORAGE_INTEGRATION | String | S3_INT
| |
(snip)
sekikn#[email protected]>DESC TABLE WEATHER;
+------+---------+--------+-------+---------+-------------+------------+-------+------------+---------+
| name | type | kind | null? | default | primary key | unique key |
check | expression | comment |
|------+---------+--------+-------+---------+-------------+------------+-------+------------+---------|
| C | VARIANT | COLUMN | Y | NULL | N | N |
NULL | NULL | NULL |
+------+---------+--------+-------+---------+-------------+------------+-------+------------+---------+
1 Row(s) produced. Time Elapsed: 0.568s
sekikn#[email protected]>SELECT COUNT(*) FROM WEATHER;
+----------+
| COUNT(*) |
|----------|
| 0 |
+----------+
1 Row(s) produced. Time Elapsed: 0.575s
```
First, ran S3ToSnowflakeOperator without prefix,
```
In [1]: from airflow.providers.snowflake.transfers.s3_to_snowflake import
S3ToSnowflakeOperator
/home/sekikn/venv/a/lib/python3.8/site-packages/snowflake/connector/options.py:78
UserWarning: You have an incompatible version of 'pyarrow' installed (2.0.0),
please install a version that adheres to: 'pyarrow<0.18.0,>=0.17.0; extra ==
"pandas"'
In [2]: t = S3ToSnowflakeOperator(stage="FOO.BAR.S3_STG",
file_format="(TYPE=AVRO)", schema="FOO.BAR", table="WEATHER", task_id="tid")
In [3]: t.execute(None)
[2021-01-10 23:46:01,029] {connection.py:206} INFO - Snowflake Connector for
Python Version: 2.3.6, Python Version: 3.8.5, Platform:
Linux-5.8.0-36-generic-x86_64-with-glibc2.29
[2021-01-10 23:46:01,030] {connection.py:743} INFO - This connection is in
OCSP Fail Open Mode. TLS Certificates would be checked for validity and
revocation status. Any other Certificate Revocation related exceptions or OCSP
Responder failures would be disregarded in favor of connectivity.
[2021-01-10 23:46:01,030] {connection.py:759} INFO - Setting
use_openssl_only mode to False
[2021-01-10 23:46:01,783] {cursor.py:530} INFO - query: [ALTER SESSION SET
autocommit=True]
[2021-01-10 23:46:01,927] {cursor.py:553} INFO - query execution done
[2021-01-10 23:46:01,927] {dbapi.py:180} INFO - Running statement:
COPY INTO FOO.BAR.WEATHER
FROM @FOO.BAR.S3_STG/
file_format=(TYPE=AVRO)
, parameters: None
[2021-01-10 23:46:01,928] {cursor.py:530} INFO - query: [COPY INTO
FOO.BAR.WEATHER FROM @FOO.BAR.S3_STG/ file_format=(TYPE=AVRO)]
[2021-01-10 23:46:02,657] {cursor.py:553} INFO - query execution done
[2021-01-10 23:46:02,658] {dbapi.py:186} INFO - Rows affected: 3
[2021-01-10 23:46:02,659] {connection.py:430} INFO - closed
```
and made sure 15 records were correctly uploaded via snowsql:
```
sekikn#[email protected]>SELECT COUNT(*) FROM WEATHER;
+----------+
| COUNT(*) |
|----------|
| 15 |
+----------+
1 Row(s) produced. Time Elapsed: 1.126s
```
Then, ran S3ToSnowflakeOperator again **with** prefix:
```
In [4]: t = S3ToSnowflakeOperator(stage="FOO.BAR.S3_STG", prefix="subdir",
file_format="(TYPE=AVRO)", schema="FOO.BAR", table="WEATHER", task_id="tid")
In [5]: t.execute(None)
[2021-01-10 23:47:56,478] {connection.py:206} INFO - Snowflake Connector for
Python Version: 2.3.6, Python Version: 3.8.5, Platform:
Linux-5.8.0-36-generic-x86_64-with-glibc2.29
[2021-01-10 23:47:56,478] {connection.py:743} INFO - This connection is in
OCSP Fail Open Mode. TLS Certificates would be checked for validity and
revocation status. Any other Certificate Revocation related exceptions or OCSP
Responder failures would be disregarded in favor of connectivity.
[2021-01-10 23:47:57,155] {cursor.py:530} INFO - query: [ALTER SESSION SET
autocommit=True]
[2021-01-10 23:47:57,290] {cursor.py:553} INFO - query execution done
[2021-01-10 23:47:57,290] {dbapi.py:180} INFO - Running statement:
COPY INTO FOO.BAR.WEATHER
FROM @FOO.BAR.S3_STG/subdir
file_format=(TYPE=AVRO)
, parameters: None
[2021-01-10 23:47:57,291] {cursor.py:530} INFO - query: [COPY INTO
FOO.BAR.WEATHER FROM @FOO.BAR.S3_STG/subdir file_format=(TYPE=AVRO)]
[2021-01-10 23:47:57,947] {cursor.py:553} INFO - query execution done
[2021-01-10 23:47:57,948] {dbapi.py:186} INFO - Rows affected: 1
[2021-01-10 23:47:57,948] {connection.py:430} INFO - closed
```
and made sure only five records were additionally uploaded this time:
```
sekikn#[email protected]>SELECT COUNT(*) FROM WEATHER;
+----------+
| COUNT(*) |
|----------|
| 20 |
+----------+
1 Row(s) produced. Time Elapsed: 0.628s
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]