This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e81c3bab80f Add fail_on_file_not_exist option to SFTPToS3Operator 
(#44320)
e81c3bab80f is described below

commit e81c3bab80f4ec82c75c320465630d8b5bd93533
Author: Namkyu Kim <[email protected]>
AuthorDate: Thu Nov 28 07:20:16 2024 +0900

    Add fail_on_file_not_exist option to SFTPToS3Operator (#44320)
    
    * Add fail_on_file_not_exist option to SFTPToS3Operator
    
    * Fixed test for fail_on_file_not_exist option.
    
    * Remove else condition with fail_on_file_not exist option
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
    
    ---------
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
---
 .../providers/amazon/aws/transfers/sftp_to_s3.py   | 12 +++++++
 .../tests/amazon/aws/transfers/test_sftp_to_s3.py  | 37 ++++++++++++++++++++++
 2 files changed, 49 insertions(+)

diff --git a/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py 
b/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
index e44741dca9b..141785fb886 100644
--- a/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
+++ b/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
@@ -50,6 +50,8 @@ class SFTPToS3Operator(BaseOperator):
         uploading the file to S3.
     :param use_temp_file: If True, copies file first to local,
         if False streams file from SFTP to S3.
+    :param fail_on_file_not_exist: If True, operator fails when file does not 
exist,
+        if False, operator will not fail and skips transfer. Default is True.
     """
 
     template_fields: Sequence[str] = ("s3_key", "sftp_path", "s3_bucket")
@@ -63,6 +65,7 @@ class SFTPToS3Operator(BaseOperator):
         sftp_conn_id: str = "ssh_default",
         s3_conn_id: str = "aws_default",
         use_temp_file: bool = True,
+        fail_on_file_not_exist: bool = True,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -72,6 +75,7 @@ class SFTPToS3Operator(BaseOperator):
         self.s3_key = s3_key
         self.s3_conn_id = s3_conn_id
         self.use_temp_file = use_temp_file
+        self.fail_on_file_not_exist = fail_on_file_not_exist
 
     @staticmethod
     def get_s3_key(s3_key: str) -> str:
@@ -86,6 +90,14 @@ class SFTPToS3Operator(BaseOperator):
 
         sftp_client = ssh_hook.get_conn().open_sftp()
 
+        try:
+            sftp_client.stat(self.sftp_path)
+        except FileNotFoundError:
+            if self.fail_on_file_not_exist:
+                raise
+            self.log.info("File %s not found on SFTP server. Skipping 
transfer.", self.sftp_path)
+            return
+
         if self.use_temp_file:
             with NamedTemporaryFile("w") as f:
                 sftp_client.get(self.sftp_path, f.name)
diff --git a/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py 
b/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py
index 7b812712d7e..e8fd3be4905 100644
--- a/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py
+++ b/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py
@@ -120,3 +120,40 @@ class TestSFTPToS3Operator:
         conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key)
         conn.delete_bucket(Bucket=self.s3_bucket)
         assert not s3_hook.check_for_bucket(self.s3_bucket)
+
+    @pytest.mark.parametrize("fail_on_file_not_exist", [True, False])
+    @mock_aws
+    @conf_vars({("core", "enable_xcom_pickling"): "True"})
+    def test_sftp_to_s3_fail_on_file_not_exist(self, fail_on_file_not_exist):
+        s3_hook = S3Hook(aws_conn_id=None)
+        conn = boto3.client("s3")
+        conn.create_bucket(Bucket=self.s3_bucket)
+        assert s3_hook.check_for_bucket(self.s3_bucket)
+
+        if fail_on_file_not_exist:
+            with pytest.raises(FileNotFoundError):
+                SFTPToS3Operator(
+                    s3_bucket=self.s3_bucket,
+                    s3_key=self.s3_key,
+                    sftp_path="/tmp/wrong_path.txt",
+                    sftp_conn_id=SFTP_CONN_ID,
+                    s3_conn_id=S3_CONN_ID,
+                    fail_on_file_not_exist=fail_on_file_not_exist,
+                    task_id="test_sftp_to_s3",
+                    dag=self.dag,
+                ).execute(None)
+        else:
+            SFTPToS3Operator(
+                s3_bucket=self.s3_bucket,
+                s3_key=self.s3_key,
+                sftp_path=self.sftp_path,
+                sftp_conn_id=SFTP_CONN_ID,
+                s3_conn_id=S3_CONN_ID,
+                fail_on_file_not_exist=fail_on_file_not_exist,
+                task_id="test_sftp_to_s3",
+                dag=self.dag,
+            ).execute(None)
+
+        conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key)
+        conn.delete_bucket(Bucket=self.s3_bucket)
+        assert not s3_hook.check_for_bucket(self.s3_bucket)

Reply via email to