This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new a37fa70e8 fix(file): In TarReader avoid prematurely completing source
(#1442) (#1473)
a37fa70e8 is described below
commit a37fa70e8b7e5d4b6c0e367d95c4f63dfbf8d1dd
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Mar 2 19:33:56 2026 +0100
fix(file): In TarReader avoid prematurely completing source (#1442) (#1473)
* fix(file): In TarReader avoid prematurely completing source when
processing multi-file TAR archives
fix #1407
* test(file): In TarReader avoid prematurely completing source when
processing multi-file TAR archives
fix #1407
* run sbt scalafmtAll
Co-authored-by: Gaël Bréard <[email protected]>
---
.../file/impl/archive/TarReaderStage.scala | 8 +++-
.../test/scala/docs/scaladsl/TarArchiveSpec.scala | 49 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 2 deletions(-)
diff --git
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
index b6239f3e9..33a0cb789 100644
---
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
+++
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
@@ -101,8 +101,12 @@ private[file] class TarReaderStage
val trailerLength = TarArchiveEntry.trailerLength(metadata)
if (buffer.length >= trailerLength) {
subSource.foreach(_.complete())
- if (isClosed(flowIn)) completeStage()
- readHeader(buffer.drop(trailerLength))
+ val remainingBuffer = buffer.drop(trailerLength)
+ if (remainingBuffer.isEmpty && isClosed(flowIn)) {
+ completeStage() // Safe: no more data to process
+ } else {
+ readHeader(remainingBuffer) // Continue processing
+ }
} else setHandlers(flowIn, flowOut, new ReadPastTrailer(metadata,
buffer, subSource))
}
diff --git a/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
b/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
index 97bbb8577..9dfbcbae6 100644
--- a/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
+++ b/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
@@ -327,6 +327,55 @@ class TarArchiveSpec
tar.futureValue shouldBe empty
}
+ "extract all files when upstream closes with buffered entries (bug 1407)"
in {
+ // Bug 1407: TarReaderStage was completing prematurely when the upstream
closed
+ // while the buffer still contained unprocessed file headers/data.
+ // This test simulates the scenario where:
+ // 1. A multi-file TAR archive is streamed as a single ByteString
+ // 2. The upstream closes immediately after sending all data
+ // 3. The buffer contains multiple file entries to process
+
+ val file1Content = ByteString("content1")
+ val file2Content = ByteString("content2")
+ val file3Content = ByteString("content3")
+
+ val metadata1 = TarArchiveMetadata("file1.txt", file1Content.length)
+ val metadata2 = TarArchiveMetadata("file2.txt", file2Content.length)
+ val metadata3 = TarArchiveMetadata("file3.txt", file3Content.length)
+
+ // Create a TAR archive with 3 files, emitted as a single ByteString
+ // This simulates an S3 stream that closes after sending all data at once
+ val multiFileArchive = Source(
+ immutable.Seq(
+ metadata1 -> Source.single(file1Content),
+ metadata2 -> Source.single(file2Content),
+ metadata3 -> Source.single(file3Content)
+ ))
+ .via(Archive.tar())
+ .runWith(collectByteString)
+
+ // Process the archive - the upstream will close after the single
ByteString
+ // Without the fix, only the first file(s) would be extracted
+ val tar = Source
+ .future(multiFileArchive)
+ .via(Archive.tarReader())
+ .mapAsync(1) {
+ case (metadata, source) =>
+ source.runWith(collectByteString).map { bs =>
+ metadata -> bs
+ }
+ }
+ .runWith(Sink.seq)
+
+ val result = tar.futureValue
+
+ // All 3 files should be extracted, not just the first one
+ (result should have).length(3)
+ result(0) shouldBe metadata1 -> file1Content
+ result(1) shouldBe metadata2 -> file2Content
+ result(2) shouldBe metadata3 -> file3Content
+ }
+
"fail on missing sub source subscription" in {
val tar =
Source
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]