This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2e1422be30e Add fail_on_file_not_exist to SFTPToGCSOperator (#56528)
2e1422be30e is described below
commit 2e1422be30e54a02f5e9af9ce02f0bb686b2fb0f
Author: John Nguyen <[email protected]>
AuthorDate: Mon Oct 20 03:39:07 2025 +0700
Add fail_on_file_not_exist to SFTPToGCSOperator (#56528)
* Add fail_on_file_not_exist to SFTPToGCSOperator
* working with local DAG
* working with local DAG
* Trigger Build
* fix test
* fix unit test case
---------
Co-authored-by: John Nguyen <[email protected]>
---
.../google/cloud/transfers/sftp_to_gcs.py | 13 ++++++++++--
.../google/cloud/transfers/test_sftp_to_gcs.py | 24 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 2 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py
b/providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py
index 9e53d16f943..8bd6af7f991 100644
---
a/providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py
+++
b/providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py
@@ -78,6 +78,8 @@ class SFTPToGCSOperator(BaseOperator):
then uploads (may require significant disk space).
When ``True``, the file streams directly without using local disk.
Defaults to ``False``.
+ :param fail_on_file_not_exist: If True, operator fails when file does not
exist,
+ if False, operator will not fail and skips transfer. Default is True.
"""
template_fields: Sequence[str] = (
@@ -101,6 +103,7 @@ class SFTPToGCSOperator(BaseOperator):
impersonation_chain: str | Sequence[str] | None = None,
sftp_prefetch: bool = True,
use_stream: bool = False,
+ fail_on_file_not_exist: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -116,6 +119,7 @@ class SFTPToGCSOperator(BaseOperator):
self.impersonation_chain = impersonation_chain
self.sftp_prefetch = sftp_prefetch
self.use_stream = use_stream
+ self.fail_on_file_not_exist = fail_on_file_not_exist
@cached_property
def sftp_hook(self):
@@ -156,7 +160,13 @@ class SFTPToGCSOperator(BaseOperator):
destination_object = (
self.destination_path if self.destination_path else
self.source_path.rsplit("/", 1)[1]
)
- self._copy_single_object(gcs_hook, self.sftp_hook,
self.source_path, destination_object)
+ try:
+ self._copy_single_object(gcs_hook, self.sftp_hook,
self.source_path, destination_object)
+ except FileNotFoundError as e:
+ if self.fail_on_file_not_exist:
+ raise e
+ self.log.info("File %s not found on SFTP server. Skipping
transfer.", self.source_path)
+ return
def _copy_single_object(
self,
@@ -172,7 +182,6 @@ class SFTPToGCSOperator(BaseOperator):
self.destination_bucket,
destination_object,
)
-
if self.use_stream:
dest_bucket = gcs_hook.get_bucket(self.destination_bucket)
dest_blob = dest_bucket.blob(destination_object)
diff --git
a/providers/google/tests/unit/google/cloud/transfers/test_sftp_to_gcs.py
b/providers/google/tests/unit/google/cloud/transfers/test_sftp_to_gcs.py
index f15d1639744..d84f889b1e8 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_sftp_to_gcs.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_sftp_to_gcs.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import os
from unittest import mock
+from unittest.mock import patch
import pytest
@@ -377,3 +378,26 @@ class TestSFTPToGCSOperator:
assert result.inputs[0].name == expected_source
assert result.outputs[0].namespace == f"gs://{TEST_BUCKET}"
assert result.outputs[0].name == expected_destination
+
+ @pytest.mark.parametrize("fail_on_file_not_exist", [False, True])
+ @mock.patch("airflow.providers.google.cloud.transfers.sftp_to_gcs.GCSHook")
+
@mock.patch("airflow.providers.google.cloud.transfers.sftp_to_gcs.SFTPHook")
+ def test_sftp_to_gcs_fail_on_file_not_exist(self, sftp_hook, gcs_hook,
fail_on_file_not_exist):
+ invalid_file_name = "main_dir/invalid-object.json"
+ task = SFTPToGCSOperator(
+ task_id=TASK_ID,
+ source_path=invalid_file_name,
+ destination_bucket=TEST_BUCKET,
+ destination_path=DESTINATION_PATH_FILE,
+ move_object=False,
+ gcp_conn_id=GCP_CONN_ID,
+ sftp_conn_id=SFTP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ fail_on_file_not_exist=fail_on_file_not_exist,
+ )
+ with patch.object(sftp_hook.return_value, "retrieve_file",
side_effect=FileNotFoundError):
+ if fail_on_file_not_exist:
+ with pytest.raises(FileNotFoundError):
+ task.execute(None)
+ else:
+ task.execute(None)