This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c3bf97d6a21 Fix duplicated logs and memory issue with S3 log handler 
(#67144)
c3bf97d6a21 is described below

commit c3bf97d6a212d22651c8fbf4f5576db8e09f78bf
Author: Jeff Stein <[email protected]>
AuthorDate: Thu May 21 13:18:10 2026 -0700

    Fix duplicated logs and memory issue with S3 log handler (#67144)
---
 .../providers/amazon/aws/log/s3_task_handler.py        |  6 +++++-
 .../tests/unit/amazon/aws/log/test_s3_task_handler.py  | 18 ++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py 
b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
index 15ffaf2b280..e9cc98ac632 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -62,6 +62,8 @@ class S3RemoteLogIO(LoggingMixin):  # noqa: D101
             has_uploaded = self.write(log, remote_loc)
             if has_uploaded and self.delete_local_copy:
                 shutil.rmtree(os.path.dirname(local_loc))
+            elif has_uploaded:
+                local_loc.write_text("")
 
     @cached_property
     def hook(self):
@@ -119,7 +121,9 @@ class S3RemoteLogIO(LoggingMixin):  # noqa: D101
         try:
             if append and self.s3_log_exists(remote_log_location):
                 old_log = self.s3_read(remote_log_location)
-                log = f"{old_log}\n{log}" if old_log else log
+                if old_log:
+                    sep = "" if old_log.endswith("\n") else "\n"
+                    log = f"{old_log}{sep}{log}"
         except Exception:
             self.log.exception("Could not verify previous log to append")
             return False
diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py 
b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
index 58adff9096c..211a213b3b8 100644
--- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
+++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py
@@ -184,6 +184,24 @@ class TestS3RemoteLogIO:
 
         assert body == b"previous \ntext"
 
+    def test_upload_repeated_appends_no_duplication(self):
+        """Simulate reschedule-mode sensor: each cycle appends to the local 
log, then uploads.
+
+        Without truncation after upload, the S3 object accumulates duplicate
+        lines and grows O(N^2).  The correct behavior is that each line appears
+        in S3 exactly once.
+        """
+        local_log = self.subject.base_log_folder / "1.log"
+        local_log.parent.mkdir(parents=True, exist_ok=True)
+
+        for cycle in range(1, 4):
+            with open(local_log, "a") as f:
+                f.write(f"cycle {cycle}\n")
+            self.subject.upload(local_log, self.ti)
+
+        body = boto3.resource("s3").Object("bucket", 
self.remote_log_key).get()["Body"].read()
+        assert body == b"cycle 1\ncycle 2\ncycle 3\n"
+
     def test_write_raises(self, caplog):
         url = "s3://nonexistentbucket/foo"
         with caplog.at_level(logging.ERROR):

Reply via email to