This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e81c3bab80f Add fail_on_file_not_exist option to SFTPToS3Operator
(#44320)
e81c3bab80f is described below
commit e81c3bab80f4ec82c75c320465630d8b5bd93533
Author: Namkyu Kim <[email protected]>
AuthorDate: Thu Nov 28 07:20:16 2024 +0900
Add fail_on_file_not_exist option to SFTPToS3Operator (#44320)
* Add fail_on_file_not_exist option to SFTPToS3Operator
* Fixed test for fail_on_file_not_exist option.
* Remove else condition with fail_on_file_not exist option
Co-authored-by: Ephraim Anierobi <[email protected]>
---------
Co-authored-by: Ephraim Anierobi <[email protected]>
---
.../providers/amazon/aws/transfers/sftp_to_s3.py | 12 +++++++
.../tests/amazon/aws/transfers/test_sftp_to_s3.py | 37 ++++++++++++++++++++++
2 files changed, 49 insertions(+)
diff --git a/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
b/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
index e44741dca9b..141785fb886 100644
--- a/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
+++ b/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
@@ -50,6 +50,8 @@ class SFTPToS3Operator(BaseOperator):
uploading the file to S3.
:param use_temp_file: If True, copies file first to local,
if False streams file from SFTP to S3.
+ :param fail_on_file_not_exist: If True, operator fails when file does not
exist,
+ if False, operator will not fail and skips transfer. Default is True.
"""
template_fields: Sequence[str] = ("s3_key", "sftp_path", "s3_bucket")
@@ -63,6 +65,7 @@ class SFTPToS3Operator(BaseOperator):
sftp_conn_id: str = "ssh_default",
s3_conn_id: str = "aws_default",
use_temp_file: bool = True,
+ fail_on_file_not_exist: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -72,6 +75,7 @@ class SFTPToS3Operator(BaseOperator):
self.s3_key = s3_key
self.s3_conn_id = s3_conn_id
self.use_temp_file = use_temp_file
+ self.fail_on_file_not_exist = fail_on_file_not_exist
@staticmethod
def get_s3_key(s3_key: str) -> str:
@@ -86,6 +90,14 @@ class SFTPToS3Operator(BaseOperator):
sftp_client = ssh_hook.get_conn().open_sftp()
+ try:
+ sftp_client.stat(self.sftp_path)
+ except FileNotFoundError:
+ if self.fail_on_file_not_exist:
+ raise
+ self.log.info("File %s not found on SFTP server. Skipping
transfer.", self.sftp_path)
+ return
+
if self.use_temp_file:
with NamedTemporaryFile("w") as f:
sftp_client.get(self.sftp_path, f.name)
diff --git a/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py
b/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py
index 7b812712d7e..e8fd3be4905 100644
--- a/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py
+++ b/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py
@@ -120,3 +120,40 @@ class TestSFTPToS3Operator:
conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key)
conn.delete_bucket(Bucket=self.s3_bucket)
assert not s3_hook.check_for_bucket(self.s3_bucket)
+
+ @pytest.mark.parametrize("fail_on_file_not_exist", [True, False])
+ @mock_aws
+ @conf_vars({("core", "enable_xcom_pickling"): "True"})
+ def test_sftp_to_s3_fail_on_file_not_exist(self, fail_on_file_not_exist):
+ s3_hook = S3Hook(aws_conn_id=None)
+ conn = boto3.client("s3")
+ conn.create_bucket(Bucket=self.s3_bucket)
+ assert s3_hook.check_for_bucket(self.s3_bucket)
+
+ if fail_on_file_not_exist:
+ with pytest.raises(FileNotFoundError):
+ SFTPToS3Operator(
+ s3_bucket=self.s3_bucket,
+ s3_key=self.s3_key,
+ sftp_path="/tmp/wrong_path.txt",
+ sftp_conn_id=SFTP_CONN_ID,
+ s3_conn_id=S3_CONN_ID,
+ fail_on_file_not_exist=fail_on_file_not_exist,
+ task_id="test_sftp_to_s3",
+ dag=self.dag,
+ ).execute(None)
+ else:
+ SFTPToS3Operator(
+ s3_bucket=self.s3_bucket,
+ s3_key=self.s3_key,
+ sftp_path=self.sftp_path,
+ sftp_conn_id=SFTP_CONN_ID,
+ s3_conn_id=S3_CONN_ID,
+ fail_on_file_not_exist=fail_on_file_not_exist,
+ task_id="test_sftp_to_s3",
+ dag=self.dag,
+ ).execute(None)
+
+ conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key)
+ conn.delete_bucket(Bucket=self.s3_bucket)
+ assert not s3_hook.check_for_bucket(self.s3_bucket)