This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 567ad643c95 issue-57891: Adding sftp_remote_host to S3 transfer
Operators (#63147)
567ad643c95 is described below
commit 567ad643c9525e0cd08d1e9a912436e50b568027
Author: Jake Roach <[email protected]>
AuthorDate: Mon Mar 9 12:37:17 2026 -0400
issue-57891: Adding sftp_remote_host to S3 transfer Operators (#63147)
---
.../providers/amazon/aws/transfers/s3_to_sftp.py | 8 +++-
.../providers/amazon/aws/transfers/sftp_to_s3.py | 8 +++-
.../unit/amazon/aws/transfers/test_s3_to_sftp.py | 55 ++++++++++++++++++++++
.../unit/amazon/aws/transfers/test_sftp_to_s3.py | 51 ++++++++++++++++++++
4 files changed, 120 insertions(+), 2 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
index 0b3b5e1cb84..87d8454af96 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
@@ -42,6 +42,8 @@ class S3ToSFTPOperator(BaseOperator):
establishing a connection to the SFTP server.
:param sftp_path: The sftp remote path. This is the specified file path for
uploading file to the SFTP server.
+ :param sftp_remote_host: The remote host of the SFTP server. Overrides
host in
+ Connection.
:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is None or empty then the default boto3 behaviour is used. If
running Airflow in a distributed manner and aws_conn_id is None or
@@ -65,6 +67,7 @@ class S3ToSFTPOperator(BaseOperator):
s3_key: str,
sftp_path: str,
sftp_conn_id: str = "ssh_default",
+ sftp_remote_host: str = "",
aws_conn_id: str | None = "aws_default",
confirm: bool = True,
**kwargs,
@@ -74,6 +77,7 @@ class S3ToSFTPOperator(BaseOperator):
self.sftp_path = sftp_path
self.s3_bucket = s3_bucket
self.s3_key = s3_key
+ self.sftp_remote_host = sftp_remote_host
self.aws_conn_id = aws_conn_id
self.confirm = confirm
@@ -85,7 +89,9 @@ class S3ToSFTPOperator(BaseOperator):
def execute(self, context: Context) -> None:
self.s3_key = self.get_s3_key(self.s3_key)
- ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id)
+
+ # SSHHook will handle a None/"" sftp_remote_host
+ ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id,
remote_host=self.sftp_remote_host)
s3_hook = S3Hook(self.aws_conn_id)
s3_client = s3_hook.get_conn()
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
index 0b3aada1950..4897ccca25c 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
@@ -40,6 +40,8 @@ class SFTPToS3Operator(BaseOperator):
:param sftp_conn_id: The sftp connection id. The name or identifier for
establishing a connection to the SFTP server.
+ :param sftp_remote_host: The remote host of the SFTP server. Overrides
host in
+ Connection.
:param sftp_path: The sftp remote path. This is the specified file path
for downloading the file from the SFTP server.
:param s3_conn_id: The s3 connection id. The name or identifier for
@@ -63,6 +65,7 @@ class SFTPToS3Operator(BaseOperator):
s3_key: str,
sftp_path: str,
sftp_conn_id: str = "ssh_default",
+ sftp_remote_host: str = "",
s3_conn_id: str = "aws_default",
use_temp_file: bool = True,
fail_on_file_not_exist: bool = True,
@@ -71,6 +74,7 @@ class SFTPToS3Operator(BaseOperator):
super().__init__(**kwargs)
self.sftp_conn_id = sftp_conn_id
self.sftp_path = sftp_path
+ self.sftp_remote_host = sftp_remote_host
self.s3_bucket = s3_bucket
self.s3_key = s3_key
self.s3_conn_id = s3_conn_id
@@ -85,7 +89,9 @@ class SFTPToS3Operator(BaseOperator):
def execute(self, context: Context) -> None:
self.s3_key = self.get_s3_key(self.s3_key)
- ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id)
+
+ # SSHHook will handle a None/"" sftp_remote_host
+ ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id,
remote_host=self.sftp_remote_host)
s3_hook = S3Hook(self.s3_conn_id)
sftp_client = ssh_hook.get_conn().open_sftp()
diff --git
a/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_sftp.py
b/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_sftp.py
index fecf207c6f4..257b898922c 100644
--- a/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_sftp.py
+++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_s3_to_sftp.py
@@ -256,5 +256,60 @@ class TestS3ToSFTPOperator:
conn.delete_bucket(Bucket=self.s3_bucket)
assert not s3_hook.check_for_bucket(self.s3_bucket)
+ @mock_aws
+ @conf_vars({("core", "enable_xcom_pickling"): "True"})
+ def test_s3_to_sftp_operator_sftp_remote_host(self):
+ """Test that sftp_remote_host overrides the connection host when
provided."""
+ s3_hook = S3Hook(aws_conn_id=None)
+ test_remote_file_content = (
+ "This is remote file content for sftp_remote_host test \n which is
also multiline "
+ "another line here \n this is last line. EOF"
+ )
+
+ # Test for creation of s3 bucket
+ conn = boto3.client("s3")
+ conn.create_bucket(Bucket=self.s3_bucket)
+ assert s3_hook.check_for_bucket(self.s3_bucket)
+
+ with open(LOCAL_FILE_PATH, "w") as file:
+ file.write(test_remote_file_content)
+ s3_hook.load_file(LOCAL_FILE_PATH, self.s3_key, bucket_name=BUCKET)
+
+ # Check if object was created in s3
+ objects_in_dest_bucket = conn.list_objects(Bucket=self.s3_bucket,
Prefix=self.s3_key)
+ assert len(objects_in_dest_bucket["Contents"]) == 1
+ assert objects_in_dest_bucket["Contents"][0]["Key"] == self.s3_key
+
+ # Execute with sftp_remote_host overriding the connection host to the
same localhost
+ run_task = S3ToSFTPOperator(
+ s3_bucket=BUCKET,
+ s3_key=S3_KEY,
+ sftp_path=SFTP_PATH,
+ sftp_conn_id=SFTP_CONN_ID,
+ sftp_remote_host="localhost",
+ task_id=TASK_ID + "_remote_host",
+ dag=self.dag,
+ )
+ assert run_task is not None
+
+ run_task.execute(None)
+
+ # Check that the file is created remotely with correct content
+ check_file_task = SSHOperator(
+ task_id="test_check_file_remote_host",
+ ssh_hook=self.hook,
+ command=f"cat {self.sftp_path}",
+ do_xcom_push=True,
+ dag=self.dag,
+ )
+ assert check_file_task is not None
+ result = check_file_task.execute(None)
+ assert result.strip() == test_remote_file_content.encode("utf-8")
+
+ # Clean up after finishing with test
+ conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key)
+ conn.delete_bucket(Bucket=self.s3_bucket)
+ assert not s3_hook.check_for_bucket(self.s3_bucket)
+
def teardown_method(self):
self.delete_remote_resource()
diff --git
a/providers/amazon/tests/unit/amazon/aws/transfers/test_sftp_to_s3.py
b/providers/amazon/tests/unit/amazon/aws/transfers/test_sftp_to_s3.py
index e8fd3be4905..feb85e33a3c 100644
--- a/providers/amazon/tests/unit/amazon/aws/transfers/test_sftp_to_s3.py
+++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_sftp_to_s3.py
@@ -157,3 +157,54 @@ class TestSFTPToS3Operator:
conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key)
conn.delete_bucket(Bucket=self.s3_bucket)
assert not s3_hook.check_for_bucket(self.s3_bucket)
+
+ @mock_aws
+ @conf_vars({("core", "enable_xcom_pickling"): "True"})
+ def test_sftp_to_s3_sftp_remote_host(self):
+ """Test that sftp_remote_host overrides the connection host when
provided."""
+ test_remote_file_content = (
+ "This is remote file content for sftp_remote_host test \n which is
also multiline "
+ "another line here \n this is last line. EOF"
+ )
+
+ # Create a test file remotely
+ create_file_task = SSHOperator(
+ task_id="test_create_file_remote_host",
+ ssh_hook=self.hook,
+ command=f"echo '{test_remote_file_content}' > {self.sftp_path}",
+ do_xcom_push=True,
+ dag=self.dag,
+ )
+ assert create_file_task is not None
+ create_file_task.execute(None)
+
+ # Test for creation of s3 bucket
+ s3_hook = S3Hook(aws_conn_id=None)
+ conn = boto3.client("s3")
+ conn.create_bucket(Bucket=self.s3_bucket)
+ assert s3_hook.check_for_bucket(self.s3_bucket)
+
+ # Execute with sftp_remote_host overriding the connection host to the
same localhost
+ run_task = SFTPToS3Operator(
+ s3_bucket=BUCKET,
+ s3_key=S3_KEY,
+ sftp_path=SFTP_PATH,
+ sftp_conn_id=SFTP_CONN_ID,
+ sftp_remote_host="localhost",
+ s3_conn_id=S3_CONN_ID,
+ task_id="test_sftp_to_s3_remote_host",
+ dag=self.dag,
+ )
+ assert run_task is not None
+
+ run_task.execute(None)
+
+ # Check if object was created in s3
+ objects_in_dest_bucket = conn.list_objects(Bucket=self.s3_bucket,
Prefix=self.s3_key)
+ assert len(objects_in_dest_bucket["Contents"]) == 1
+ assert objects_in_dest_bucket["Contents"][0]["Key"] == self.s3_key
+
+ # Clean up after finishing with test
+ conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key)
+ conn.delete_bucket(Bucket=self.s3_bucket)
+ assert not s3_hook.check_for_bucket(self.s3_bucket)