This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 8031d622d1 add test case that shows we don't retry FailedSources
(#2624)
8031d622d1 is described below
commit 8031d622d1362c03c100203dc26b3621316aa204
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Jan 18 10:31:47 2026 +0100
add test case that shows we don't retry FailedSources (#2624)
* add test case that shows we don't retry FailedSources
* Update SourceSpec.scala
* Update SourceSpec.scala
* Update SourceSpec.scala
---
.../apache/pekko/stream/scaladsl/SourceSpec.scala | 57 ++++++++++++++++++++++
1 file changed, 57 insertions(+)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 9cef219a9b..d0d301dc3a 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -29,6 +29,7 @@ import pekko.NotUsed
import pekko.stream.testkit._
import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.EventFilter
+import pekko.util.ByteString
import scala.collection.immutable
import scala.concurrent.duration._
@@ -694,4 +695,60 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
.expectComplete()
}
}
+
+ "recoverWithRetries" must {
+ "retry when exceptions occur" in {
+ val counter = new java.util.concurrent.atomic.AtomicInteger()
+
+ val source =
+ withRetriesTest(failedSource("origin")) { _ =>
+ counter.incrementAndGet()
+ exceptionSource()
+ } { _ =>
+ counter.get() < 3
+ }
+
+ assertThrows[ArithmeticException] {
+ Await.result(source.runWith(Sink.ignore), Duration.Inf)
+ }
+
+ assert(counter.get() == 3)
+ }
+
+ "not retry FailedSources" in {
+ // https://github.com/apache/pekko/issues/2620
+ val counter = new java.util.concurrent.atomic.AtomicInteger()
+
+ val source =
+ withRetriesTest(failedSource("origin")) { _ =>
+ counter.incrementAndGet()
+ failedSource("does not work")
+ } { _ =>
+ counter.get() < 3
+ }
+
+ assertThrows[ArithmeticException] {
+ Await.result(source.runWith(Sink.ignore), Duration.Inf)
+ }
+
+ assert(counter.get() == 1)
+ }
+ }
+
+ private def withRetriesTest(originSource: Source[ByteString,
Any])(fallbackTo: Long => Source[ByteString, NotUsed])(
+ shouldRetry: Throwable => Boolean = { _ => true }): Source[ByteString,
NotUsed] =
+ originSource.recoverWithRetries(
+ -1,
+ {
+ case e: Throwable if shouldRetry(e) =>
+ fallbackTo(0)
+ }
+ ).mapMaterializedValue(_ => NotUsed)
+
+ private def failedSource(message: String): Source[ByteString, NotUsed] =
+ Source.failed(new ArithmeticException(message))
+
+ // has adivide by zero exception
+ private def exceptionSource(): Source[ByteString, NotUsed] =
+ Source.single(5).map(_ / 0).map(s => ByteString.fromString(s.toString))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]