This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c3bf97d6a21 Fix duplicated logs and memory issue with S3 log handler
(#67144)
c3bf97d6a21 is described below
commit c3bf97d6a212d22651c8fbf4f5576db8e09f78bf
Author: Jeff Stein <[email protected]>
AuthorDate: Thu May 21 13:18:10 2026 -0700
Fix duplicated logs and memory issue with S3 log handler (#67144)
---
.../providers/amazon/aws/log/s3_task_handler.py | 6 +++++-
.../tests/unit/amazon/aws/log/test_s3_task_handler.py | 18 ++++++++++++++++++
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
index 15ffaf2b280..e9cc98ac632 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -62,6 +62,8 @@ class S3RemoteLogIO(LoggingMixin): # noqa: D101
has_uploaded = self.write(log, remote_loc)
if has_uploaded and self.delete_local_copy:
shutil.rmtree(os.path.dirname(local_loc))
+ elif has_uploaded:
+ local_loc.write_text("")
@cached_property
def hook(self):
@@ -119,7 +121,9 @@ class S3RemoteLogIO(LoggingMixin): # noqa: D101
try:
if append and self.s3_log_exists(remote_log_location):
old_log = self.s3_read(remote_log_location)
- log = f"{old_log}\n{log}" if old_log else log
+ if old_log:
+ sep = "" if old_log.endswith("\n") else "\n"
+ log = f"{old_log}{sep}{log}"
except Exception:
self.log.exception("Could not verify previous log to append")
return False
diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
index 58adff9096c..211a213b3b8 100644
--- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
+++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
@@ -184,6 +184,24 @@ class TestS3RemoteLogIO:
assert body == b"previous \ntext"
+ def test_upload_repeated_appends_no_duplication(self):
+ """Simulate reschedule-mode sensor: each cycle appends to the local
log, then uploads.
+
+ Without truncation after upload, the S3 object accumulates duplicate
+ lines and grows O(N^2). The correct behavior is that each line appears
+ in S3 exactly once.
+ """
+ local_log = self.subject.base_log_folder / "1.log"
+ local_log.parent.mkdir(parents=True, exist_ok=True)
+
+ for cycle in range(1, 4):
+ with open(local_log, "a") as f:
+ f.write(f"cycle {cycle}\n")
+ self.subject.upload(local_log, self.ti)
+
+ body = boto3.resource("s3").Object("bucket",
self.remote_log_key).get()["Body"].read()
+ assert body == b"cycle 1\ncycle 2\ncycle 3\n"
+
def test_write_raises(self, caplog):
url = "s3://nonexistentbucket/foo"
with caplog.at_level(logging.ERROR):