This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 8aa2a6cd6 fix(tar-reader): buffer upstream data when subSource is not 
yet pulled (#1475)
8aa2a6cd6 is described below

commit 8aa2a6cd6f6bc6d3542a21212cf55299cef09c5c
Author: Gaël Bréard <[email protected]>
AuthorDate: Wed Mar 4 10:52:24 2026 +0100

    fix(tar-reader): buffer upstream data when subSource is not yet pulled 
(#1475)
    
    When CollectFile.onPush() was called before the downstream had subscribed
    to and pulled the subSource outlet, the unconditional call to 
subSource.push()
    violated Pekko's back-pressure protocol and crashed with:
      "Cannot push port (SubSourceOutlet(fileOut)) twice, or before it being 
pulled"
    
    Fix: onPush() now checks subSource.isAvailable before pushing. If the 
subSource
    has not yet been pulled, incoming data is accumulated in the existing 
buffer and
    flushed in the subSource's onPull() handler once downstream is ready.
    
    Also fix emitted counter and buffer reset in subPush() before handing off to
    readTrailer(), and add tryPullIfNeeded() in subSource.onPull() when more 
file
    content is still expected after flushing the buffer.
---
 .../file/impl/archive/TarReaderStage.scala         | 13 ++++++-
 .../test/scala/docs/scaladsl/TarArchiveSpec.scala  | 41 ++++++++++++++++++++++
 2 files changed, 53 insertions(+), 1 deletion(-)

diff --git 
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
 
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
index 33a0cb789..2315ef2f4 100644
--- 
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
+++ 
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
@@ -202,6 +202,8 @@ private[file] class TarReaderStage
                 subPush(buffer)
                 buffer = ByteString.empty
                 if (isClosed(flowIn)) onUpstreamFinish()
+                // If we still need more file content, pull upstream to 
unblock it
+                else if (emitted < metadata.size) tryPullIfNeeded()
               } else {
                 tryPullIfNeeded()
               }
@@ -221,6 +223,8 @@ private[file] class TarReaderStage
           if (remaining <= bs.length) {
             val (emit, remain) = bs.splitAt(remaining.toInt)
             subSource.push(emit)
+            emitted += emit.length
+            buffer = ByteString.empty
             readTrailer(metadata, remain, Some(subSource))
           } else {
             subSource.push(bs)
@@ -229,7 +233,14 @@ private[file] class TarReaderStage
         }
 
         override def onPush(): Unit = {
-          subPush(grab(flowIn))
+          val data = grab(flowIn)
+          if (subSource.isAvailable) {
+            // subSource has been pulled and is ready: push directly
+            subPush(data)
+          } else {
+            // subSource not yet pulled: buffer until next onPull()
+            buffer ++= data
+          }
         }
 
         override def onUpstreamFinish(): Unit = {
diff --git a/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala 
b/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
index 61202b746..c58afb6a1 100644
--- a/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
+++ b/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
@@ -390,6 +390,47 @@ class TarArchiveSpec
     }
   }
 
+  "tar reader regression" should {
+    // Regression test for: Cannot push port (SubSourceOutlet(fileOut)) twice, 
or before it being pulled
+    //
+    // Root cause: in CollectFile, onPush() called subPush() unconditionally 
without checking
+    // whether the subSource outlet was ready to accept a push. When upstream 
data arrived
+    // while subSource had not yet been pulled by downstream, the push 
violated Pekko's
+    // back-pressure protocol and crashed.
+    //
+    // Fix: onPush() now checks subSource.isAvailable. If false, data is 
buffered and flushed
+    // in subSource's onPull() handler once downstream is ready.
+    //
+    // The crash depends on interpreter-internal timing (whether flowOut is 
available at the
+    // exact moment ReadPastTrailer transitions to the next file), which 
cannot be controlled
+    // deterministically from a unit test. This test instead verifies correct 
functional
+    // behaviour when a two-file archive is delivered in small chunks, 
exercising all
+    // transitions between CollectHeader, CollectFile, and ReadPastTrailer 
across multiple
+    // upstream pushes — the scenario that triggered the crash in production.
+    "handle chunked delivery of a two-file archive" in {
+      val content1 = ByteString("first file content in tar")
+      val content2 = ByteString("second file content in tar")
+
+      val archiveBytes: ByteString =
+        Source(List(
+          TarArchiveMetadata("file1.txt", content1.length) -> 
Source.single(content1),
+          TarArchiveMetadata("file2.txt", content2.length) -> 
Source.single(content2)))
+          .via(Archive.tar())
+          .runWith(collectByteString)
+          .futureValue
+
+      // Deliver the archive in 32-byte chunks to exercise all partial-read 
code paths.
+      val result = Source(archiveBytes.grouped(32).toList)
+        .via(Archive.tarReader())
+        .mapAsync(1) { case (_, subSource: Source[ByteString, NotUsed]) =>
+          subSource.runWith(collectByteString)
+        }
+        .runWith(Sink.seq)
+
+      result.futureValue shouldBe Seq(content1, content2)
+    }
+  }
+
   "advanced tar reading" should {
     "allow tar files in tar files to be extracted in a single flow" in {
       val tenDigits = ByteString("1234567890")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to