This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e491aac92a1 fix(providers/amazon): S3DagBundle does not delete stale
dag recursively (#63104)
e491aac92a1 is described below
commit e491aac92a1d50f322a08d92d83616e7c79b3f2e
Author: jerry <[email protected]>
AuthorDate: Tue Mar 10 00:46:10 2026 +0800
fix(providers/amazon): S3DagBundle does not delete stale dag recursively
(#63104)
* Refactor S3Hook's local file synchronization logic to mach GCSHook.
Update tests to cover nested directories and ensure proper logging of deleted
files and directories.
* Update S3Hook logging level for deleted files and directories from info
to debug to reduce log verbosity.
---
.../src/airflow/providers/amazon/aws/hooks/s3.py | 34 +++++++++-------------
.../amazon/tests/unit/amazon/aws/hooks/test_s3.py | 14 +++++++++
2 files changed, 27 insertions(+), 21 deletions(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py
index f9beb0d0400..d0f8ce8b784 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py
@@ -1735,27 +1735,19 @@ class S3Hook(AwsBaseHook):
s3_client.delete_bucket_tagging(Bucket=bucket_name)
def _sync_to_local_dir_delete_stale_local_files(self, current_s3_objects:
list[Path], local_dir: Path):
- current_s3_keys = {key for key in current_s3_objects}
-
- for item in local_dir.iterdir():
- item: Path # type: ignore[no-redef]
- absolute_item_path = item.resolve()
-
- if absolute_item_path not in current_s3_keys:
- try:
- if item.is_file():
- item.unlink(missing_ok=True)
- self.log.debug("Deleted stale local file: %s", item)
- elif item.is_dir():
- # delete only when the folder is empty
- if not os.listdir(item):
- item.rmdir()
- self.log.debug("Deleted stale empty directory:
%s", item)
- else:
- self.log.debug("Skipping stale item of unknown type:
%s", item)
- except OSError as e:
- self.log.error("Error deleting stale item %s: %s", item, e)
- raise e
+ current_s3_keys = {key.resolve() for key in current_s3_objects}
+
+ for item in local_dir.rglob("*"):
+ if item.is_file() and item.resolve() not in current_s3_keys:
+ self.log.debug("Deleted stale local file: %s", item)
+ item.unlink()
+ # Clean up empty directories
+ for root, dirs, _ in os.walk(local_dir, topdown=False):
+ for d in dirs:
+ dir_path = os.path.join(root, d)
+ if not os.listdir(dir_path):
+ self.log.debug("Deleted stale empty directory: %s",
dir_path)
+ os.rmdir(dir_path)
def _sync_to_local_dir_if_changed(self, s3_bucket, s3_object,
local_target_path: Path):
should_download = False
diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
index 14371da9cb4..99fd45aff89 100644
--- a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
+++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
@@ -1870,14 +1870,28 @@ class TestAwsS3Hook:
local_file_that_should_be_deleted.write_text("test dag")
local_folder_should_be_deleted =
Path(sync_local_dir).joinpath("local_folder_should_be_deleted")
local_folder_should_be_deleted.mkdir(exist_ok=True)
+ nested_stale_file = Path(sync_local_dir).joinpath("subproject1",
"stale_nested.py")
+ nested_stale_file.write_text("stale nested file")
+ deep_nested_dir = Path(sync_local_dir).joinpath("subproject1", "deep")
+ deep_nested_dir.mkdir()
+ deep_stale_file = deep_nested_dir.joinpath("stale_deep.py")
+ deep_stale_file.write_text("stale deep file")
hook.log.debug = MagicMock()
hook.sync_to_local_dir(
bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="",
delete_stale=True
)
logs_string = get_logs_string(hook.log.debug.call_args_list)
assert f"Deleted stale local file:
{local_file_that_should_be_deleted.as_posix()}" in logs_string
+ assert f"Deleted stale local file: {nested_stale_file.as_posix()}" in
logs_string
+ assert f"Deleted stale local file: {deep_stale_file.as_posix()}" in
logs_string
assert f"Deleted stale empty directory:
{local_folder_should_be_deleted.as_posix()}" in logs_string
+ assert f"Deleted stale empty directory: {deep_nested_dir.as_posix()}"
in logs_string
+ assert not nested_stale_file.exists()
+ assert not deep_stale_file.exists()
+ assert not deep_nested_dir.exists()
+ assert Path(sync_local_dir).joinpath("dag_01.py").exists()
+ assert Path(sync_local_dir).joinpath("subproject1",
"dag_a.py").exists()
s3_client.put_object(Bucket=s3_bucket, Key="dag_03.py", Body=b"test
data-changed")
hook.log.debug = MagicMock()