This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8a34d25049 S3ToSnowflakeOperator: escape single quote in s3_keys
(#24607)
8a34d25049 is described below
commit 8a34d25049a060a035d4db4a49cd4a0d0b07fb0b
Author: Andrey Anshin <[email protected]>
AuthorDate: Sun Jun 26 19:21:30 2022 +0400
S3ToSnowflakeOperator: escape single quote in s3_keys (#24607)
---
.../snowflake/transfers/s3_to_snowflake.py | 2 +-
.../snowflake/transfers/test_s3_to_snowflake.py | 23 ++++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/snowflake/transfers/s3_to_snowflake.py
b/airflow/providers/snowflake/transfers/s3_to_snowflake.py
index 6bb32f3e56..d97bcaf91c 100644
--- a/airflow/providers/snowflake/transfers/s3_to_snowflake.py
+++ b/airflow/providers/snowflake/transfers/s3_to_snowflake.py
@@ -125,7 +125,7 @@ class S3ToSnowflakeOperator(BaseOperator):
f"FROM @{self.stage}/{self.prefix or ''}",
]
if self.s3_keys:
- files = ", ".join(f"'{key}'" for key in self.s3_keys)
+ files = ", ".join(map(enclose_param, self.s3_keys))
sql_parts.append(f"files=({files})")
sql_parts.append(f"file_format={self.file_format}")
if self.pattern:
diff --git a/tests/providers/snowflake/transfers/test_s3_to_snowflake.py
b/tests/providers/snowflake/transfers/test_s3_to_snowflake.py
index 413043cdaf..e60a21e41f 100644
--- a/tests/providers/snowflake/transfers/test_s3_to_snowflake.py
+++ b/tests/providers/snowflake/transfers/test_s3_to_snowflake.py
@@ -69,3 +69,26 @@ class TestS3ToSnowflakeTransfer:
mock_run.assert_called_once()
assert mock_run.call_args[0][0] == copy_query
+
+ @pytest.mark.parametrize("pattern", [None, '.*[.]csv'])
+ @pytest.mark.parametrize("files", [None, ["foo.csv", "bar.json",
"spam.parquet", "egg.xml"]])
+
@mock.patch("airflow.providers.snowflake.transfers.s3_to_snowflake.enclose_param")
+ def test_escaping_in_operator(self, mock_enclose_fn, files, pattern):
+ mock_enclose_fn.return_value = "mock"
+ with
mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run"):
+ S3ToSnowflakeOperator(
+ s3_keys=files,
+ table="mock",
+ stage="mock",
+ prefix="mock",
+ file_format="mock",
+ pattern=pattern,
+ task_id="task_id",
+ dag=None,
+ ).execute(None)
+
+ for file in files or []:
+ assert mock.call(file) in mock_enclose_fn.call_args_list
+
+ if pattern:
+ assert mock.call(pattern) in mock_enclose_fn.call_args_list