This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e491aac92a1 fix(providers/amazon): S3DagBundle does not delete stale 
dag recursively (#63104)
e491aac92a1 is described below

commit e491aac92a1d50f322a08d92d83616e7c79b3f2e
Author: jerry <[email protected]>
AuthorDate: Tue Mar 10 00:46:10 2026 +0800

    fix(providers/amazon): S3DagBundle does not delete stale dag recursively 
(#63104)
    
    * Refactor S3Hook's local file synchronization logic to mach GCSHook. 
Update tests to cover nested directories and ensure proper logging of deleted 
files and directories.
    
    * Update S3Hook logging level for deleted files and directories from info 
to debug to reduce log verbosity.
---
 .../src/airflow/providers/amazon/aws/hooks/s3.py   | 34 +++++++++-------------
 .../amazon/tests/unit/amazon/aws/hooks/test_s3.py  | 14 +++++++++
 2 files changed, 27 insertions(+), 21 deletions(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py
index f9beb0d0400..d0f8ce8b784 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py
@@ -1735,27 +1735,19 @@ class S3Hook(AwsBaseHook):
         s3_client.delete_bucket_tagging(Bucket=bucket_name)
 
     def _sync_to_local_dir_delete_stale_local_files(self, current_s3_objects: 
list[Path], local_dir: Path):
-        current_s3_keys = {key for key in current_s3_objects}
-
-        for item in local_dir.iterdir():
-            item: Path  # type: ignore[no-redef]
-            absolute_item_path = item.resolve()
-
-            if absolute_item_path not in current_s3_keys:
-                try:
-                    if item.is_file():
-                        item.unlink(missing_ok=True)
-                        self.log.debug("Deleted stale local file: %s", item)
-                    elif item.is_dir():
-                        # delete only when the folder is empty
-                        if not os.listdir(item):
-                            item.rmdir()
-                            self.log.debug("Deleted stale empty directory: 
%s", item)
-                    else:
-                        self.log.debug("Skipping stale item of unknown type: 
%s", item)
-                except OSError as e:
-                    self.log.error("Error deleting stale item %s: %s", item, e)
-                    raise e
+        current_s3_keys = {key.resolve() for key in current_s3_objects}
+
+        for item in local_dir.rglob("*"):
+            if item.is_file() and item.resolve() not in current_s3_keys:
+                self.log.debug("Deleted stale local file: %s", item)
+                item.unlink()
+        # Clean up empty directories
+        for root, dirs, _ in os.walk(local_dir, topdown=False):
+            for d in dirs:
+                dir_path = os.path.join(root, d)
+                if not os.listdir(dir_path):
+                    self.log.debug("Deleted stale empty directory: %s", 
dir_path)
+                    os.rmdir(dir_path)
 
     def _sync_to_local_dir_if_changed(self, s3_bucket, s3_object, 
local_target_path: Path):
         should_download = False
diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py 
b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
index 14371da9cb4..99fd45aff89 100644
--- a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
+++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
@@ -1870,14 +1870,28 @@ class TestAwsS3Hook:
         local_file_that_should_be_deleted.write_text("test dag")
         local_folder_should_be_deleted = 
Path(sync_local_dir).joinpath("local_folder_should_be_deleted")
         local_folder_should_be_deleted.mkdir(exist_ok=True)
+        nested_stale_file = Path(sync_local_dir).joinpath("subproject1", 
"stale_nested.py")
+        nested_stale_file.write_text("stale nested file")
+        deep_nested_dir = Path(sync_local_dir).joinpath("subproject1", "deep")
+        deep_nested_dir.mkdir()
+        deep_stale_file = deep_nested_dir.joinpath("stale_deep.py")
+        deep_stale_file.write_text("stale deep file")
         hook.log.debug = MagicMock()
         hook.sync_to_local_dir(
             bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", 
delete_stale=True
         )
         logs_string = get_logs_string(hook.log.debug.call_args_list)
         assert f"Deleted stale local file: 
{local_file_that_should_be_deleted.as_posix()}" in logs_string
+        assert f"Deleted stale local file: {nested_stale_file.as_posix()}" in 
logs_string
+        assert f"Deleted stale local file: {deep_stale_file.as_posix()}" in 
logs_string
 
         assert f"Deleted stale empty directory: 
{local_folder_should_be_deleted.as_posix()}" in logs_string
+        assert f"Deleted stale empty directory: {deep_nested_dir.as_posix()}" 
in logs_string
+        assert not nested_stale_file.exists()
+        assert not deep_stale_file.exists()
+        assert not deep_nested_dir.exists()
+        assert Path(sync_local_dir).joinpath("dag_01.py").exists()
+        assert Path(sync_local_dir).joinpath("subproject1", 
"dag_a.py").exists()
 
         s3_client.put_object(Bucket=s3_bucket, Key="dag_03.py", Body=b"test 
data-changed")
         hook.log.debug = MagicMock()

Reply via email to