This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 8aa2a6cd6 fix(tar-reader): buffer upstream data when subSource is not
yet pulled (#1475)
8aa2a6cd6 is described below
commit 8aa2a6cd6f6bc6d3542a21212cf55299cef09c5c
Author: Gaël Bréard <[email protected]>
AuthorDate: Wed Mar 4 10:52:24 2026 +0100
fix(tar-reader): buffer upstream data when subSource is not yet pulled
(#1475)
When CollectFile.onPush() was called before the downstream had subscribed
to and pulled the subSource outlet, the unconditional call to
subSource.push()
violated Pekko's back-pressure protocol and crashed with:
"Cannot push port (SubSourceOutlet(fileOut)) twice, or before it being
pulled"
Fix: onPush() now checks subSource.isAvailable before pushing. If the
subSource
has not yet been pulled, incoming data is accumulated in the existing
buffer and
flushed in the subSource's onPull() handler once downstream is ready.
Also fix emitted counter and buffer reset in subPush() before handing off to
readTrailer(), and add tryPullIfNeeded() in subSource.onPull() when more
file
content is still expected after flushing the buffer.
---
.../file/impl/archive/TarReaderStage.scala | 13 ++++++-
.../test/scala/docs/scaladsl/TarArchiveSpec.scala | 41 ++++++++++++++++++++++
2 files changed, 53 insertions(+), 1 deletion(-)
diff --git
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
index 33a0cb789..2315ef2f4 100644
---
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
+++
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
@@ -202,6 +202,8 @@ private[file] class TarReaderStage
subPush(buffer)
buffer = ByteString.empty
if (isClosed(flowIn)) onUpstreamFinish()
+ // If we still need more file content, pull upstream to
unblock it
+ else if (emitted < metadata.size) tryPullIfNeeded()
} else {
tryPullIfNeeded()
}
@@ -221,6 +223,8 @@ private[file] class TarReaderStage
if (remaining <= bs.length) {
val (emit, remain) = bs.splitAt(remaining.toInt)
subSource.push(emit)
+ emitted += emit.length
+ buffer = ByteString.empty
readTrailer(metadata, remain, Some(subSource))
} else {
subSource.push(bs)
@@ -229,7 +233,14 @@ private[file] class TarReaderStage
}
override def onPush(): Unit = {
- subPush(grab(flowIn))
+ val data = grab(flowIn)
+ if (subSource.isAvailable) {
+ // subSource has been pulled and is ready: push directly
+ subPush(data)
+ } else {
+ // subSource not yet pulled: buffer until next onPull()
+ buffer ++= data
+ }
}
override def onUpstreamFinish(): Unit = {
diff --git a/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
b/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
index 61202b746..c58afb6a1 100644
--- a/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
+++ b/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
@@ -390,6 +390,47 @@ class TarArchiveSpec
}
}
+ "tar reader regression" should {
+ // Regression test for: Cannot push port (SubSourceOutlet(fileOut)) twice,
or before it being pulled
+ //
+ // Root cause: in CollectFile, onPush() called subPush() unconditionally
without checking
+ // whether the subSource outlet was ready to accept a push. When upstream
data arrived
+ // while subSource had not yet been pulled by downstream, the push
violated Pekko's
+ // back-pressure protocol and crashed.
+ //
+ // Fix: onPush() now checks subSource.isAvailable. If false, data is
buffered and flushed
+ // in subSource's onPull() handler once downstream is ready.
+ //
+ // The crash depends on interpreter-internal timing (whether flowOut is
available at the
+ // exact moment ReadPastTrailer transitions to the next file), which
cannot be controlled
+ // deterministically from a unit test. This test instead verifies correct
functional
+ // behaviour when a two-file archive is delivered in small chunks,
exercising all
+ // transitions between CollectHeader, CollectFile, and ReadPastTrailer
across multiple
+ // upstream pushes — the scenario that triggered the crash in production.
+ "handle chunked delivery of a two-file archive" in {
+ val content1 = ByteString("first file content in tar")
+ val content2 = ByteString("second file content in tar")
+
+ val archiveBytes: ByteString =
+ Source(List(
+ TarArchiveMetadata("file1.txt", content1.length) ->
Source.single(content1),
+ TarArchiveMetadata("file2.txt", content2.length) ->
Source.single(content2)))
+ .via(Archive.tar())
+ .runWith(collectByteString)
+ .futureValue
+
+ // Deliver the archive in 32-byte chunks to exercise all partial-read
code paths.
+ val result = Source(archiveBytes.grouped(32).toList)
+ .via(Archive.tarReader())
+ .mapAsync(1) { case (_, subSource: Source[ByteString, NotUsed]) =>
+ subSource.runWith(collectByteString)
+ }
+ .runWith(Sink.seq)
+
+ result.futureValue shouldBe Seq(content1, content2)
+ }
+ }
+
"advanced tar reading" should {
"allow tar files in tar files to be extracted in a single flow" in {
val tenDigits = ByteString("1234567890")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]