This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 64dd5d6b429 Fix inconsistency in S3 transfer operators (#67378)
64dd5d6b429 is described below

commit 64dd5d6b429f62492aa115cba81ce799d833b9db
Author: Yuseok Jo <[email protected]>
AuthorDate: Fri May 29 03:11:49 2026 +0900

    Fix inconsistency in S3 transfer operators (#67378)
---
 .../providers/amazon/aws/transfers/ftp_to_s3.py    |  41 +++---
 .../providers/amazon/aws/transfers/s3_to_ftp.py    |  80 +++++++++--
 .../providers/amazon/aws/transfers/s3_to_sftp.py   |  92 +++++++++++--
 .../providers/amazon/aws/transfers/sftp_to_s3.py   | 148 +++++++++++++++++----
 .../unit/amazon/aws/transfers/test_ftp_to_s3.py    |  35 +++++
 .../unit/amazon/aws/transfers/test_s3_to_ftp.py    |  61 ++++++++-
 .../unit/amazon/aws/transfers/test_s3_to_sftp.py   |  62 +++++++++
 .../unit/amazon/aws/transfers/test_sftp_to_s3.py   | 104 ++++++++++++++-
 8 files changed, 557 insertions(+), 66 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/ftp_to_s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/ftp_to_s3.py
index 251c16a5e26..5dd933a197b 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/ftp_to_s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/ftp_to_s3.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import ftplib
 from collections.abc import Sequence
 from tempfile import NamedTemporaryFile
 from typing import TYPE_CHECKING
@@ -60,6 +61,9 @@ class FTPToS3Operator(BaseOperator):
     :param gzip: If True, the file will be compressed locally
     :param acl_policy: String specifying the canned ACL policy for the file 
being
         uploaded to the S3 bucket.
+    :param fail_on_file_not_exist: If True, operator fails when a source file 
does not
+        exist on the FTP server. If False, the operator logs a warning and 
skips the
+        transfer. Default is True.
     """
 
     template_fields: Sequence[str] = ("ftp_path", "s3_bucket", "s3_key", 
"ftp_filenames", "s3_filenames")
@@ -78,6 +82,7 @@ class FTPToS3Operator(BaseOperator):
         encrypt: bool = False,
         gzip: bool = False,
         acl_policy: str | None = None,
+        fail_on_file_not_exist: bool = True,
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -92,25 +97,31 @@ class FTPToS3Operator(BaseOperator):
         self.encrypt = encrypt
         self.gzip = gzip
         self.acl_policy = acl_policy
+        self.fail_on_file_not_exist = fail_on_file_not_exist
         self.s3_hook: S3Hook | None = None
         self.ftp_hook: FTPHook | None = None
 
     def __upload_to_s3_from_ftp(self, remote_filename, s3_file_key):
-        with NamedTemporaryFile() as local_tmp_file:
-            self.ftp_hook.retrieve_file(
-                remote_full_path=remote_filename, 
local_full_path_or_buffer=local_tmp_file.name
-            )
-
-            self.s3_hook.load_file(
-                filename=local_tmp_file.name,
-                key=s3_file_key,
-                bucket_name=self.s3_bucket,
-                replace=self.replace,
-                encrypt=self.encrypt,
-                gzip=self.gzip,
-                acl_policy=self.acl_policy,
-            )
-            self.log.info("File upload to %s", s3_file_key)
+        try:
+            with NamedTemporaryFile() as local_tmp_file:
+                self.ftp_hook.retrieve_file(
+                    remote_full_path=remote_filename, 
local_full_path_or_buffer=local_tmp_file.name
+                )
+                self.s3_hook.load_file(
+                    filename=local_tmp_file.name,
+                    key=s3_file_key,
+                    bucket_name=self.s3_bucket,
+                    replace=self.replace,
+                    encrypt=self.encrypt,
+                    gzip=self.gzip,
+                    acl_policy=self.acl_policy,
+                )
+                self.log.info("File upload to %s", s3_file_key)
+        except ftplib.error_perm as e:
+            if "550" in str(e) and not self.fail_on_file_not_exist:
+                self.log.info("File %s not found on FTP server. Skipping 
transfer.", remote_filename)
+                return
+            raise
 
     def execute(self, context: Context):
         self.ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_ftp.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_ftp.py
index 2a0a4fb91e8..ad532e9ff1d 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_ftp.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_ftp.py
@@ -39,16 +39,27 @@ class S3ToFTPOperator(BaseOperator):
 
     :param s3_bucket: The targeted s3 bucket. This is the S3 bucket from
         where the file is downloaded.
-    :param s3_key: The targeted s3 key. This is the specified file path for
-        downloading the file from S3.
-    :param ftp_path: The ftp remote path. This is the specified file path for
-        uploading file to the FTP server.
+    :param s3_key: The targeted s3 key. For a single file it must include the 
file
+        path. For multiple files it is the key prefix (directory) and must end 
with
+        ``"/"``.
+    :param s3_filenames: Only used if you want to move multiple files. You can 
pass
+        a list with exact key suffixes present under the s3_key prefix, or a 
string
+        prefix that all filenames must match. Use ``"*"`` to move all objects 
under
+        the s3_key prefix.
+    :param ftp_path: The ftp remote path. For a single file it must include 
the file
+        path. For multiple files it is the destination directory path and must 
end
+        with ``"/"``.
+    :param ftp_filenames: Only used if you want to move multiple files and 
name them
+        differently at the destination. It can be a list of filenames or a 
string
+        prefix that replaces the s3 prefix.
     :param aws_conn_id: reference to a specific AWS connection
     :param ftp_conn_id: The ftp connection id. The name or identifier for
         establishing a connection to the FTP server.
+    :param fail_on_file_not_exist: If True, operator fails when a source S3 
key does not
+        exist. If False, the operator logs a warning and skips the transfer. 
Default is True.
     """
 
-    template_fields: Sequence[str] = ("s3_bucket", "s3_key", "ftp_path")
+    template_fields: Sequence[str] = ("s3_bucket", "s3_key", "ftp_path", 
"s3_filenames", "ftp_filenames")
 
     def __init__(
         self,
@@ -56,26 +67,71 @@ class S3ToFTPOperator(BaseOperator):
         s3_bucket,
         s3_key,
         ftp_path,
+        s3_filenames: str | list[str] | None = None,
+        ftp_filenames: str | list[str] | None = None,
         aws_conn_id="aws_default",
         ftp_conn_id="ftp_default",
+        fail_on_file_not_exist: bool = True,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
         self.s3_bucket = s3_bucket
         self.s3_key = s3_key
         self.ftp_path = ftp_path
+        self.s3_filenames = s3_filenames
+        self.ftp_filenames = ftp_filenames
         self.aws_conn_id = aws_conn_id
         self.ftp_conn_id = ftp_conn_id
+        self.fail_on_file_not_exist = fail_on_file_not_exist
+
+    def _download_from_s3(self, s3_hook: S3Hook, ftp_hook: FTPHook, s3_key: 
str, ftp_path: str) -> None:
+        if not s3_hook.check_for_key(s3_key, self.s3_bucket):
+            if self.fail_on_file_not_exist:
+                raise FileNotFoundError(f"Key {s3_key!r} not found in S3 
bucket {self.s3_bucket!r}")
+            self.log.info("Key %s not found in S3. Skipping transfer.", s3_key)
+            return
+        s3_obj = s3_hook.get_key(s3_key, self.s3_bucket)
+        with NamedTemporaryFile() as local_tmp_file:
+            self.log.info("Downloading file from %s", s3_key)
+            s3_obj.download_fileobj(local_tmp_file)
+            local_tmp_file.seek(0)
+            ftp_hook.store_file(ftp_path, local_tmp_file.name)
+            self.log.info("File stored in %s", ftp_path)
 
     def execute(self, context: Context):
         s3_hook = S3Hook(self.aws_conn_id)
         ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
 
-        s3_obj = s3_hook.get_key(self.s3_key, self.s3_bucket)
+        if self.s3_filenames:
+            if isinstance(self.s3_filenames, str):
+                self.log.info("Getting files in s3://%s/%s", self.s3_bucket, 
self.s3_key)
+                all_keys = s3_hook.list_keys(bucket_name=self.s3_bucket, 
prefix=self.s3_key) or []
+                filenames = [k[len(self.s3_key) :] for k in all_keys]
+                if self.s3_filenames == "*":
+                    files = filenames
+                else:
+                    s3_prefix: str = self.s3_filenames
+                    files = [f for f in filenames if s3_prefix in f]
 
-        with NamedTemporaryFile() as local_tmp_file:
-            self.log.info("Downloading file from %s", self.s3_key)
-            s3_obj.download_fileobj(local_tmp_file)
-            local_tmp_file.seek(0)
-            ftp_hook.store_file(self.ftp_path, local_tmp_file.name)
-            self.log.info("File stored in %s", {self.ftp_path})
+                for file in files:
+                    self.log.info("Moving file %s", file)
+                    if self.ftp_filenames and isinstance(self.ftp_filenames, 
str):
+                        ftp_filename = file.replace(self.s3_filenames, 
self.ftp_filenames)
+                    else:
+                        ftp_filename = file
+                    self._download_from_s3(
+                        s3_hook, ftp_hook, self.s3_key + file, self.ftp_path + 
ftp_filename
+                    )
+            else:
+                if self.ftp_filenames:
+                    for s3_file, ftp_file in zip(self.s3_filenames, 
self.ftp_filenames):
+                        self._download_from_s3(
+                            s3_hook, ftp_hook, self.s3_key + s3_file, 
self.ftp_path + ftp_file
+                        )
+                else:
+                    for s3_file in self.s3_filenames:
+                        self._download_from_s3(
+                            s3_hook, ftp_hook, self.s3_key + s3_file, 
self.ftp_path + s3_file
+                        )
+        else:
+            self._download_from_s3(s3_hook, ftp_hook, self.s3_key, 
self.ftp_path)
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
index 87d8454af96..b44d6fd82b9 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
@@ -27,6 +27,8 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.ssh.hooks.ssh import SSHHook
 
 if TYPE_CHECKING:
+    import paramiko
+
     from airflow.sdk import Context
 
 
@@ -40,8 +42,9 @@ class S3ToSFTPOperator(BaseOperator):
 
     :param sftp_conn_id: The sftp connection id. The name or identifier for
         establishing a connection to the SFTP server.
-    :param sftp_path: The sftp remote path. This is the specified file path for
-        uploading file to the SFTP server.
+    :param sftp_path: The sftp remote path. For a single file it must include 
the
+        file path. For multiple files it is the destination directory path and 
must
+        end with ``"/"``.
     :param sftp_remote_host: The remote host of the SFTP server. Overrides 
host in
         Connection.
     :param aws_conn_id: The Airflow connection used for AWS credentials.
@@ -51,14 +54,24 @@ class S3ToSFTPOperator(BaseOperator):
         maintained on each worker node).
     :param s3_bucket: The targeted s3 bucket. This is the S3 bucket from
         where the file is downloaded.
-    :param s3_key: The targeted s3 key. This is the specified file path for
-        downloading the file from S3.
+    :param s3_key: The targeted s3 key. For a single file it must include the 
file
+        path. For multiple files it is the key prefix (directory) and must end 
with
+        ``"/"``.
+    :param s3_filenames: Only used if you want to move multiple files. You can 
pass
+        a list with exact key suffixes present under the s3_key prefix, or a 
string
+        prefix that all filenames must match. Use ``"*"`` to move all objects 
under
+        the s3_key prefix.
+    :param sftp_filenames: Only used if you want to move multiple files and 
name them
+        differently at the destination. It can be a list of filenames or a 
string
+        prefix that replaces the s3 prefix.
     :param confirm: specify if the SFTP operation should be confirmed, 
defaults to True.
         When True, a stat will be performed on the remote file after upload to 
verify
         the file size matches and confirm successful transfer.
+    :param fail_on_file_not_exist: If True, operator fails when a source S3 
key does not
+        exist. If False, the operator logs a warning and skips the transfer. 
Default is True.
     """
 
-    template_fields: Sequence[str] = ("s3_key", "sftp_path", "s3_bucket")
+    template_fields: Sequence[str] = ("s3_key", "sftp_path", "s3_bucket", 
"s3_filenames", "sftp_filenames")
 
     def __init__(
         self,
@@ -69,7 +82,10 @@ class S3ToSFTPOperator(BaseOperator):
         sftp_conn_id: str = "ssh_default",
         sftp_remote_host: str = "",
         aws_conn_id: str | None = "aws_default",
+        s3_filenames: str | list[str] | None = None,
+        sftp_filenames: str | list[str] | None = None,
         confirm: bool = True,
+        fail_on_file_not_exist: bool = True,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -79,7 +95,10 @@ class S3ToSFTPOperator(BaseOperator):
         self.s3_key = s3_key
         self.sftp_remote_host = sftp_remote_host
         self.aws_conn_id = aws_conn_id
+        self.s3_filenames = s3_filenames
+        self.sftp_filenames = sftp_filenames
         self.confirm = confirm
+        self.fail_on_file_not_exist = fail_on_file_not_exist
 
     @staticmethod
     def get_s3_key(s3_key: str) -> str:
@@ -87,16 +106,69 @@ class S3ToSFTPOperator(BaseOperator):
         parsed_s3_key = urlsplit(s3_key)
         return parsed_s3_key.path.lstrip("/")
 
+    def _download_from_s3(
+        self,
+        sftp_client: paramiko.SFTPClient,
+        s3_hook: S3Hook,
+        s3_key: str,
+        sftp_path: str,
+    ) -> None:
+        if not s3_hook.check_for_key(s3_key, self.s3_bucket):
+            if self.fail_on_file_not_exist:
+                raise FileNotFoundError(f"Key {s3_key!r} not found in S3 
bucket {self.s3_bucket!r}")
+            self.log.info("Key %s not found in S3. Skipping transfer.", s3_key)
+            return
+        with NamedTemporaryFile("w") as f:
+            s3_hook.get_conn().download_file(self.s3_bucket, s3_key, f.name)
+            sftp_client.put(f.name, sftp_path, confirm=self.confirm)
+
     def execute(self, context: Context) -> None:
         self.s3_key = self.get_s3_key(self.s3_key)
 
         # SSHHook will handle a None/"" sftp_remote_host
         ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id, 
remote_host=self.sftp_remote_host)
         s3_hook = S3Hook(self.aws_conn_id)
-
-        s3_client = s3_hook.get_conn()
         sftp_client = ssh_hook.get_conn().open_sftp()
 
-        with NamedTemporaryFile("w") as f:
-            s3_client.download_file(self.s3_bucket, self.s3_key, f.name)
-            sftp_client.put(f.name, self.sftp_path, confirm=self.confirm)
+        if self.s3_filenames:
+            if isinstance(self.s3_filenames, str):
+                self.log.info("Getting files in s3://%s/%s", self.s3_bucket, 
self.s3_key)
+                all_keys = s3_hook.list_keys(bucket_name=self.s3_bucket, 
prefix=self.s3_key) or []
+                filenames = [k[len(self.s3_key) :] for k in all_keys]
+                if self.s3_filenames == "*":
+                    files = filenames
+                else:
+                    s3_prefix: str = self.s3_filenames
+                    files = [f for f in filenames if s3_prefix in f]
+
+                for file in files:
+                    self.log.info("Moving file %s", file)
+                    if self.sftp_filenames and isinstance(self.sftp_filenames, 
str):
+                        sftp_filename = file.replace(self.s3_filenames, 
self.sftp_filenames)
+                    else:
+                        sftp_filename = file
+                    self._download_from_s3(
+                        sftp_client,
+                        s3_hook,
+                        self.s3_key + file,
+                        self.sftp_path + sftp_filename,
+                    )
+            else:
+                if self.sftp_filenames:
+                    for s3_file, sftp_file in zip(self.s3_filenames, 
self.sftp_filenames):
+                        self._download_from_s3(
+                            sftp_client,
+                            s3_hook,
+                            self.s3_key + s3_file,
+                            self.sftp_path + sftp_file,
+                        )
+                else:
+                    for s3_file in self.s3_filenames:
+                        self._download_from_s3(
+                            sftp_client,
+                            s3_hook,
+                            self.s3_key + s3_file,
+                            self.sftp_path + s3_file,
+                        )
+        else:
+            self._download_from_s3(sftp_client, s3_hook, self.s3_key, 
self.sftp_path)
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
index 4897ccca25c..1ee19aed36a 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
@@ -17,16 +17,20 @@
 # under the License.
 from __future__ import annotations
 
+import warnings
 from collections.abc import Sequence
 from tempfile import NamedTemporaryFile
 from typing import TYPE_CHECKING
 from urllib.parse import urlsplit
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.ssh.hooks.ssh import SSHHook
 
 if TYPE_CHECKING:
+    import paramiko
+
     from airflow.sdk import Context
 
 
@@ -42,21 +46,35 @@ class SFTPToS3Operator(BaseOperator):
         establishing a connection to the SFTP server.
     :param sftp_remote_host: The remote host of the SFTP server. Overrides 
host in
         Connection.
-    :param sftp_path: The sftp remote path. This is the specified file path
-        for downloading the file from the SFTP server.
-    :param s3_conn_id: The s3 connection id. The name or identifier for
-        establishing a connection to S3
+    :param sftp_path: The sftp remote path. For a single file it must include 
the
+        file path. For multiple files it is the directory path where the files 
are
+        located.
+    :param sftp_filenames: Only used if you want to move multiple files. You 
can pass
+        a list with exact filenames present in the sftp path, or a prefix that 
all
+        files must match. Use ``"*"`` to move all files within the sftp path.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
     :param s3_bucket: The targeted s3 bucket. This is the S3 bucket to where
         the file is uploaded.
-    :param s3_key: The targeted s3 key. This is the specified path for
-        uploading the file to S3.
+    :param s3_key: The targeted s3 key. For a single file it must include the 
file
+        path. For multiple files it must end with ``"/"``.
+    :param s3_filenames: Only used if you want to move multiple files and name 
them
+        differently from the originals on the SFTP server. It can be a list of
+        filenames or a string prefix that replaces the sftp prefix.
     :param use_temp_file: If True, copies file first to local,
         if False streams file from SFTP to S3.
     :param fail_on_file_not_exist: If True, operator fails when file does not 
exist,
         if False, operator will not fail and skips transfer. Default is True.
+    :param replace: If True, overwrite the S3 key if it already exists.
+    :param encrypt: If True, the file will be encrypted on the server-side by 
S3.
+    :param gzip: If True, the file will be compressed locally before upload.
+    :param acl_policy: Canned ACL policy for the file being uploaded to S3.
     """
 
-    template_fields: Sequence[str] = ("s3_key", "sftp_path", "s3_bucket")
+    template_fields: Sequence[str] = ("s3_key", "sftp_path", "s3_bucket", 
"sftp_filenames", "s3_filenames")
 
     def __init__(
         self,
@@ -66,20 +84,40 @@ class SFTPToS3Operator(BaseOperator):
         sftp_path: str,
         sftp_conn_id: str = "ssh_default",
         sftp_remote_host: str = "",
-        s3_conn_id: str = "aws_default",
+        sftp_filenames: str | list[str] | None = None,
+        s3_filenames: str | list[str] | None = None,
         use_temp_file: bool = True,
         fail_on_file_not_exist: bool = True,
+        replace: bool = False,
+        encrypt: bool = False,
+        gzip: bool = False,
+        acl_policy: str | None = None,
+        aws_conn_id: str = "aws_default",
+        s3_conn_id: str | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
+        if s3_conn_id is not None:
+            warnings.warn(
+                "The s3_conn_id parameter is deprecated. Use aws_conn_id 
instead.",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
+            aws_conn_id = s3_conn_id
         self.sftp_conn_id = sftp_conn_id
         self.sftp_path = sftp_path
         self.sftp_remote_host = sftp_remote_host
         self.s3_bucket = s3_bucket
         self.s3_key = s3_key
-        self.s3_conn_id = s3_conn_id
+        self.aws_conn_id = aws_conn_id
+        self.sftp_filenames = sftp_filenames
+        self.s3_filenames = s3_filenames
         self.use_temp_file = use_temp_file
         self.fail_on_file_not_exist = fail_on_file_not_exist
+        self.replace = replace
+        self.encrypt = encrypt
+        self.gzip = gzip
+        self.acl_policy = acl_policy
 
     @staticmethod
     def get_s3_key(s3_key: str) -> str:
@@ -87,28 +125,90 @@ class SFTPToS3Operator(BaseOperator):
         parsed_s3_key = urlsplit(s3_key)
         return parsed_s3_key.path.lstrip("/")
 
-    def execute(self, context: Context) -> None:
-        self.s3_key = self.get_s3_key(self.s3_key)
-
-        # SSHHook will handle a None/"" sftp_remote_host
-        ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id, 
remote_host=self.sftp_remote_host)
-        s3_hook = S3Hook(self.s3_conn_id)
-
-        sftp_client = ssh_hook.get_conn().open_sftp()
-
+    def _upload_to_s3(
+        self,
+        sftp_client: paramiko.SFTPClient,
+        s3_hook: S3Hook,
+        sftp_path: str,
+        s3_key: str,
+    ) -> None:
         try:
-            sftp_client.stat(self.sftp_path)
+            sftp_client.stat(sftp_path)
         except FileNotFoundError:
             if self.fail_on_file_not_exist:
                 raise
-            self.log.info("File %s not found on SFTP server. Skipping 
transfer.", self.sftp_path)
+            self.log.info("File %s not found on SFTP server. Skipping 
transfer.", sftp_path)
             return
 
         if self.use_temp_file:
             with NamedTemporaryFile("w") as f:
-                sftp_client.get(self.sftp_path, f.name)
+                sftp_client.get(sftp_path, f.name)
+                s3_hook.load_file(
+                    filename=f.name,
+                    key=s3_key,
+                    bucket_name=self.s3_bucket,
+                    replace=self.replace,
+                    encrypt=self.encrypt,
+                    gzip=self.gzip,
+                    acl_policy=self.acl_policy,
+                )
+        else:
+            extra_args: dict = {}
+            if self.encrypt:
+                extra_args["ServerSideEncryption"] = "AES256"
+            if self.acl_policy:
+                extra_args["ACL"] = self.acl_policy
+            with sftp_client.file(sftp_path, mode="rb") as data:
+                s3_hook.get_conn().upload_fileobj(
+                    data, self.s3_bucket, s3_key, ExtraArgs=extra_args or 
None, Callback=self.log.info
+                )
+
+    def execute(self, context: Context) -> None:
+        self.s3_key = self.get_s3_key(self.s3_key)
+
+        # SSHHook will handle a None/"" sftp_remote_host
+        ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id, 
remote_host=self.sftp_remote_host)
+        s3_hook = S3Hook(self.aws_conn_id)
+        sftp_client = ssh_hook.get_conn().open_sftp()
 
-                s3_hook.load_file(filename=f.name, key=self.s3_key, 
bucket_name=self.s3_bucket, replace=True)
+        if self.sftp_filenames:
+            if isinstance(self.sftp_filenames, str):
+                self.log.info("Getting files in %s", self.sftp_path)
+                list_dir = sftp_client.listdir(self.sftp_path)
+                if self.sftp_filenames == "*":
+                    files = list_dir
+                else:
+                    sftp_prefix: str = self.sftp_filenames
+                    files = [f for f in list_dir if sftp_prefix in f]
+
+                for file in files:
+                    self.log.info("Moving file %s", file)
+                    if self.s3_filenames and isinstance(self.s3_filenames, 
str):
+                        s3_filename = file.replace(self.sftp_filenames, 
self.s3_filenames)
+                    else:
+                        s3_filename = file
+                    self._upload_to_s3(
+                        sftp_client,
+                        s3_hook,
+                        f"{self.sftp_path}/{file}",
+                        f"{self.s3_key}{s3_filename}",
+                    )
+            else:
+                if self.s3_filenames:
+                    for sftp_file, s3_file in zip(self.sftp_filenames, 
self.s3_filenames):
+                        self._upload_to_s3(
+                            sftp_client,
+                            s3_hook,
+                            self.sftp_path + sftp_file,
+                            self.s3_key + s3_file,
+                        )
+                else:
+                    for sftp_file in self.sftp_filenames:
+                        self._upload_to_s3(
+                            sftp_client,
+                            s3_hook,
+                            self.sftp_path + sftp_file,
+                            self.s3_key + sftp_file,
+                        )
         else:
-            with sftp_client.file(self.sftp_path, mode="rb") as data:
-                s3_hook.get_conn().upload_fileobj(data, self.s3_bucket, 
self.s3_key, Callback=self.log.info)
+            self._upload_to_s3(sftp_client, s3_hook, self.sftp_path, 
self.s3_key)
diff --git a/providers/amazon/tests/unit/amazon/aws/transfers/test_ftp_to_s3.py 
b/providers/amazon/tests/unit/amazon/aws/transfers/test_ftp_to_s3.py
index 757a3964641..102969b33c8 100644
--- a/providers/amazon/tests/unit/amazon/aws/transfers/test_ftp_to_s3.py
+++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_ftp_to_s3.py
@@ -17,7 +17,11 @@
 # under the License.
 from __future__ import annotations
 
+import ftplib
 from unittest import mock
+from unittest.mock import MagicMock, patch
+
+import pytest
 
 from airflow.providers.amazon.aws.transfers.ftp_to_s3 import FTPToS3Operator
 
@@ -128,3 +132,34 @@ class TestFTPToS3Operator:
         operator.execute(None)
 
         
mock_ftp_hook_list_directory.assert_called_once_with(path=FTP_PATH_MULTIPLE)
+
+
+class TestFTPToS3OperatorInit:
+    """Unit tests for FTPToS3Operator.__init__ that do not require an FTP 
server."""
+
+    def test_fail_on_file_not_exist_default(self):
+        """fail_on_file_not_exist defaults to True."""
+        op = FTPToS3Operator(task_id="test_fail_default", s3_bucket=BUCKET, 
s3_key=S3_KEY, ftp_path=FTP_PATH)
+        assert op.fail_on_file_not_exist is True
+
+    @pytest.mark.parametrize("fail_on_file_not_exist", [True, False])
+    def test_fail_on_file_not_exist_skip(self, fail_on_file_not_exist):
+        """When FTP file is missing (error_perm 550): raise if True, skip if 
False."""
+        op = FTPToS3Operator(
+            task_id="test_skip",
+            s3_bucket=BUCKET,
+            s3_key=S3_KEY,
+            ftp_path=FTP_PATH,
+            fail_on_file_not_exist=fail_on_file_not_exist,
+        )
+        op.ftp_hook = MagicMock()
+        op.s3_hook = MagicMock()
+        op.ftp_hook.retrieve_file.side_effect = ftplib.error_perm("550 No such 
file or directory")
+
+        if fail_on_file_not_exist:
+            with pytest.raises(ftplib.error_perm):
+                op._FTPToS3Operator__upload_to_s3_from_ftp(FTP_PATH, S3_KEY)
+        else:
+            with patch.object(op.log, "info") as mock_log:
+                op._FTPToS3Operator__upload_to_s3_from_ftp(FTP_PATH, S3_KEY)
+            mock_log.assert_called_once()
diff --git a/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_ftp.py 
b/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_ftp.py
index 6308d34ac02..899708c533f 100644
--- a/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_ftp.py
+++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_ftp.py
@@ -19,6 +19,8 @@ from __future__ import annotations
 
 from unittest import mock
 
+import pytest
+
 from airflow.providers.amazon.aws.transfers.s3_to_ftp import S3ToFTPOperator
 
 TASK_ID = "test_s3_to_ftp"
@@ -32,8 +34,11 @@ FTP_CONN_ID = "ftp_default"
 class TestS3ToFTPOperator:
     @mock.patch("airflow.providers.ftp.hooks.ftp.FTPHook.store_file")
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_key")
+    @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.check_for_key", 
return_value=True)
     
@mock.patch("airflow.providers.amazon.aws.transfers.s3_to_ftp.NamedTemporaryFile")
-    def test_execute(self, mock_local_tmp_file, mock_s3_hook_get_key, 
mock_ftp_hook_store_file):
+    def test_execute(
+        self, mock_local_tmp_file, mock_check_for_key, mock_s3_hook_get_key, 
mock_ftp_hook_store_file
+    ):
         operator = S3ToFTPOperator(task_id=TASK_ID, s3_bucket=BUCKET, 
s3_key=S3_KEY, ftp_path=FTP_PATH)
         operator.execute(None)
 
@@ -42,3 +47,57 @@ class TestS3ToFTPOperator:
         mock_local_tmp_file_value = 
mock_local_tmp_file.return_value.__enter__.return_value
         
mock_s3_hook_get_key.return_value.download_fileobj.assert_called_once_with(mock_local_tmp_file_value)
         mock_ftp_hook_store_file.assert_called_once_with(operator.ftp_path, 
mock_local_tmp_file_value.name)
+
+
+class TestS3ToFTPOperatorInit:
+    """Unit tests for S3ToFTPOperator.__init__ that do not require an FTP 
server."""
+
+    @pytest.mark.parametrize(
+        ("s3_filenames", "ftp_filenames"),
+        [
+            (None, None),
+            ("*", None),
+            ("prefix_", "renamed_"),
+            (["a.csv", "b.csv"], ["x.csv", "y.csv"]),
+        ],
+    )
+    def test_multi_file_params(self, s3_filenames, ftp_filenames):
+        """s3_filenames and ftp_filenames are stored correctly."""
+        op = S3ToFTPOperator(
+            task_id="test_multi",
+            s3_bucket=BUCKET,
+            s3_key=S3_KEY,
+            ftp_path=FTP_PATH,
+            s3_filenames=s3_filenames,
+            ftp_filenames=ftp_filenames,
+        )
+        assert op.s3_filenames == s3_filenames
+        assert op.ftp_filenames == ftp_filenames
+
+    def test_fail_on_file_not_exist_default(self):
+        """fail_on_file_not_exist defaults to True."""
+        op = S3ToFTPOperator(task_id="test_fail_default", s3_bucket=BUCKET, 
s3_key=S3_KEY, ftp_path=FTP_PATH)
+        assert op.fail_on_file_not_exist is True
+
+    @pytest.mark.parametrize("fail_on_file_not_exist", [True, False])
+    def test_fail_on_file_not_exist_skip(self, fail_on_file_not_exist):
+        """When key is missing: raise FileNotFoundError if True, skip if 
False."""
+        from unittest.mock import MagicMock, patch
+
+        op = S3ToFTPOperator(
+            task_id="test_skip",
+            s3_bucket=BUCKET,
+            s3_key=S3_KEY,
+            ftp_path=FTP_PATH,
+            fail_on_file_not_exist=fail_on_file_not_exist,
+        )
+        mock_s3_hook = MagicMock()
+        mock_s3_hook.check_for_key.return_value = False
+
+        if fail_on_file_not_exist:
+            with pytest.raises(FileNotFoundError):
+                op._download_from_s3(mock_s3_hook, MagicMock(), S3_KEY, 
FTP_PATH)
+        else:
+            with patch.object(op.log, "info") as mock_log:
+                op._download_from_s3(mock_s3_hook, MagicMock(), S3_KEY, 
FTP_PATH)
+            mock_log.assert_called_once()
diff --git 
a/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_sftp.py 
b/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_sftp.py
index 257b898922c..867c7336ae9 100644
--- a/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_sftp.py
+++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_sftp.py
@@ -313,3 +313,65 @@ class TestS3ToSFTPOperator:
 
     def teardown_method(self):
         self.delete_remote_resource()
+
+
+class TestS3ToSFTPOperatorInit:
+    """Unit tests for S3ToSFTPOperator.__init__ that do not require an SSH 
server."""
+
+    @pytest.mark.parametrize(
+        ("s3_filenames", "sftp_filenames"),
+        [
+            (None, None),
+            ("*", None),
+            ("prefix_", "renamed_"),
+            (["a.csv", "b.csv"], ["x.csv", "y.csv"]),
+        ],
+    )
+    def test_multi_file_params(self, s3_filenames, sftp_filenames):
+        """s3_filenames and sftp_filenames are stored correctly."""
+        op = S3ToSFTPOperator(
+            task_id="test_multi",
+            s3_bucket=BUCKET,
+            s3_key=S3_KEY,
+            sftp_path=SFTP_PATH,
+            sftp_conn_id=SFTP_CONN_ID,
+            s3_filenames=s3_filenames,
+            sftp_filenames=sftp_filenames,
+        )
+        assert op.s3_filenames == s3_filenames
+        assert op.sftp_filenames == sftp_filenames
+
+    def test_fail_on_file_not_exist_default(self):
+        """fail_on_file_not_exist defaults to True."""
+        op = S3ToSFTPOperator(
+            task_id="test_fail_default",
+            s3_bucket=BUCKET,
+            s3_key=S3_KEY,
+            sftp_path=SFTP_PATH,
+            sftp_conn_id=SFTP_CONN_ID,
+        )
+        assert op.fail_on_file_not_exist is True
+
+    @pytest.mark.parametrize("fail_on_file_not_exist", [True, False])
+    def test_fail_on_file_not_exist_skip(self, fail_on_file_not_exist):
+        """When key is missing: raise FileNotFoundError if True, skip if 
False."""
+        from unittest.mock import MagicMock, patch
+
+        op = S3ToSFTPOperator(
+            task_id="test_skip",
+            s3_bucket=BUCKET,
+            s3_key=S3_KEY,
+            sftp_path=SFTP_PATH,
+            sftp_conn_id=SFTP_CONN_ID,
+            fail_on_file_not_exist=fail_on_file_not_exist,
+        )
+        mock_s3_hook = MagicMock()
+        mock_s3_hook.check_for_key.return_value = False
+
+        if fail_on_file_not_exist:
+            with pytest.raises(FileNotFoundError):
+                op._download_from_s3(MagicMock(), mock_s3_hook, S3_KEY, 
SFTP_PATH)
+        else:
+            with patch.object(op.log, "info") as mock_log:
+                op._download_from_s3(MagicMock(), mock_s3_hook, S3_KEY, 
SFTP_PATH)
+            mock_log.assert_called_once()
diff --git 
a/providers/amazon/tests/unit/amazon/aws/transfers/test_sftp_to_s3.py 
b/providers/amazon/tests/unit/amazon/aws/transfers/test_sftp_to_s3.py
index feb85e33a3c..73ad1bae0c3 100644
--- a/providers/amazon/tests/unit/amazon/aws/transfers/test_sftp_to_s3.py
+++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_sftp_to_s3.py
@@ -17,10 +17,13 @@
 # under the License.
 from __future__ import annotations
 
+import warnings
+
 import boto3
 import pytest
 from moto import mock_aws
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.models import DAG
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.transfers.sftp_to_s3 import SFTPToS3Operator
@@ -99,7 +102,7 @@ class TestSFTPToS3Operator:
             s3_key=S3_KEY,
             sftp_path=SFTP_PATH,
             sftp_conn_id=SFTP_CONN_ID,
-            s3_conn_id=S3_CONN_ID,
+            aws_conn_id=S3_CONN_ID,
             use_temp_file=use_temp_file,
             task_id="test_sftp_to_s3",
             dag=self.dag,
@@ -137,7 +140,7 @@ class TestSFTPToS3Operator:
                     s3_key=self.s3_key,
                     sftp_path="/tmp/wrong_path.txt",
                     sftp_conn_id=SFTP_CONN_ID,
-                    s3_conn_id=S3_CONN_ID,
+                    aws_conn_id=S3_CONN_ID,
                     fail_on_file_not_exist=fail_on_file_not_exist,
                     task_id="test_sftp_to_s3",
                     dag=self.dag,
@@ -148,7 +151,7 @@ class TestSFTPToS3Operator:
                 s3_key=self.s3_key,
                 sftp_path=self.sftp_path,
                 sftp_conn_id=SFTP_CONN_ID,
-                s3_conn_id=S3_CONN_ID,
+                aws_conn_id=S3_CONN_ID,
                 fail_on_file_not_exist=fail_on_file_not_exist,
                 task_id="test_sftp_to_s3",
                 dag=self.dag,
@@ -191,7 +194,7 @@ class TestSFTPToS3Operator:
             sftp_path=SFTP_PATH,
             sftp_conn_id=SFTP_CONN_ID,
             sftp_remote_host="localhost",
-            s3_conn_id=S3_CONN_ID,
+            aws_conn_id=S3_CONN_ID,
             task_id="test_sftp_to_s3_remote_host",
             dag=self.dag,
         )
@@ -208,3 +211,96 @@ class TestSFTPToS3Operator:
         conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key)
         conn.delete_bucket(Bucket=self.s3_bucket)
         assert not s3_hook.check_for_bucket(self.s3_bucket)
+
+
+class TestSFTPToS3OperatorInit:
+    """Unit tests for SFTPToS3Operator.__init__ that do not require an SSH 
server."""
+
+    def test_s3_conn_id_deprecated(self):
+        """s3_conn_id is a deprecated alias for aws_conn_id and must raise 
DeprecationWarning."""
+        with warnings.catch_warnings(record=True) as caught:
+            warnings.simplefilter("always")
+            op = SFTPToS3Operator(
+                task_id="test_deprecated",
+                s3_bucket=BUCKET,
+                s3_key=S3_KEY,
+                sftp_path=SFTP_PATH,
+                sftp_conn_id=SFTP_CONN_ID,
+                s3_conn_id="my_legacy_conn",
+            )
+        deprecation_warnings = [
+            w for w in caught if issubclass(w.category, 
AirflowProviderDeprecationWarning)
+        ]
+        assert len(deprecation_warnings) == 1
+        assert "s3_conn_id" in str(deprecation_warnings[0].message)
+        assert op.aws_conn_id == "my_legacy_conn"
+
+    def test_aws_conn_id_default(self):
+        """aws_conn_id defaults to 'aws_default' and no 
AirflowProviderDeprecationWarning is raised."""
+        with warnings.catch_warnings(record=True) as caught:
+            warnings.simplefilter("always")
+            op = SFTPToS3Operator(
+                task_id="test_default",
+                s3_bucket=BUCKET,
+                s3_key=S3_KEY,
+                sftp_path=SFTP_PATH,
+                sftp_conn_id=SFTP_CONN_ID,
+            )
+        deprecation_warnings = [
+            w for w in caught if issubclass(w.category, 
AirflowProviderDeprecationWarning)
+        ]
+        assert not deprecation_warnings
+        assert op.aws_conn_id == "aws_default"
+
+    @pytest.mark.parametrize(
+        ("kwargs", "expected"),
+        [
+            ({}, {"replace": False, "encrypt": False, "gzip": False, 
"acl_policy": None}),
+            (
+                {"replace": True, "encrypt": True, "gzip": True, "acl_policy": 
"bucket-owner-full-control"},
+                {
+                    "replace": True,
+                    "encrypt": True,
+                    "gzip": True,
+                    "acl_policy": "bucket-owner-full-control",
+                },
+            ),
+        ],
+    )
+    def test_s3_upload_options(self, kwargs, expected):
+        """replace/encrypt/gzip/acl_policy are stored and default to 
False/None."""
+        op = SFTPToS3Operator(
+            task_id="test_options",
+            s3_bucket=BUCKET,
+            s3_key=S3_KEY,
+            sftp_path=SFTP_PATH,
+            sftp_conn_id=SFTP_CONN_ID,
+            **kwargs,
+        )
+        assert op.replace == expected["replace"]
+        assert op.encrypt == expected["encrypt"]
+        assert op.gzip == expected["gzip"]
+        assert op.acl_policy == expected["acl_policy"]
+
+    @pytest.mark.parametrize(
+        ("sftp_filenames", "s3_filenames"),
+        [
+            (None, None),
+            ("*", None),
+            ("prefix_", "renamed_"),
+            (["a.csv", "b.csv"], ["x.csv", "y.csv"]),
+        ],
+    )
+    def test_multi_file_params(self, sftp_filenames, s3_filenames):
+        """sftp_filenames and s3_filenames are stored correctly."""
+        op = SFTPToS3Operator(
+            task_id="test_multi",
+            s3_bucket=BUCKET,
+            s3_key=S3_KEY,
+            sftp_path=SFTP_PATH,
+            sftp_conn_id=SFTP_CONN_ID,
+            sftp_filenames=sftp_filenames,
+            s3_filenames=s3_filenames,
+        )
+        assert op.sftp_filenames == sftp_filenames
+        assert op.s3_filenames == s3_filenames


Reply via email to