This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 029c5572a5 fix: Fix recoverWith on Failed stage. (#2631)
029c5572a5 is described below
commit 029c5572a5cdb6f10b23d56b502507f6878a5441
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Jan 19 17:34:12 2026 +0800
fix: Fix recoverWith on Failed stage. (#2631)
---
.../stream/scaladsl/FlowRecoverWithSpec.scala | 36 +++++++++++++++++++---
.../apache/pekko/stream/scaladsl/SourceSpec.scala | 29 ++++++++---------
.../org/apache/pekko/stream/impl/fusing/Ops.scala | 5 +--
.../org/apache/pekko/stream/javadsl/Flow.scala | 4 +++
.../org/apache/pekko/stream/javadsl/Source.scala | 4 +++
.../org/apache/pekko/stream/javadsl/SubFlow.scala | 2 ++
.../apache/pekko/stream/javadsl/SubSource.scala | 2 ++
.../org/apache/pekko/stream/scaladsl/Flow.scala | 2 ++
8 files changed, 62 insertions(+), 22 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
index 2ba9ed4a51..47f129a329 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
@@ -73,11 +73,39 @@ class FlowRecoverWithSpec extends StreamSpec {
}
"recover with a failed future source" in {
- Source.failed(ex)
- .recoverWith { case _: Throwable => Source.future(Future.failed(ex)) }
+ val counter = new java.util.concurrent.atomic.AtomicInteger(0)
+ Source.failed[Int](ex)
+ .recoverWith {
+ case _: Throwable =>
+ if (counter.incrementAndGet() < 100) {
+ Source.future(Future.failed(ex))
+ } else {
+ Source.single(101)
+ }
+ }
.runWith(TestSink[Int]())
- .request(1)
- .expectError(ex)
+ .request(100)
+ .expectNext(101)
+ .expectComplete()
+ counter.get() shouldBe 100
+ }
+
+ "recover with a failed source" in {
+ val counter = new java.util.concurrent.atomic.AtomicInteger(0)
+ Source.failed[Int](ex)
+ .recoverWith {
+ case _: Throwable =>
+ if (counter.incrementAndGet() < 100) {
+ Source.failed(ex)
+ } else {
+ Source.single(101)
+ }
+ }
+ .runWith(TestSink[Int]())
+ .request(100)
+ .expectNext(101)
+ .expectComplete()
+ counter.get() shouldBe 100
}
"recover with a java stream source" in {
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index d0d301dc3a..f9059aae07 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -701,7 +701,7 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
val counter = new java.util.concurrent.atomic.AtomicInteger()
val source =
- withRetriesTest(failedSource("origin")) { _ =>
+ withRetriesTest(failedSource("origin")) { () =>
counter.incrementAndGet()
exceptionSource()
} { _ =>
@@ -715,33 +715,30 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
assert(counter.get() == 3)
}
- "not retry FailedSources" in {
- // https://github.com/apache/pekko/issues/2620
+ "should retry on a failed source" in {
val counter = new java.util.concurrent.atomic.AtomicInteger()
val source =
- withRetriesTest(failedSource("origin")) { _ =>
- counter.incrementAndGet()
- failedSource("does not work")
- } { _ =>
- counter.get() < 3
- }
+ withRetriesTest(failedSource("origin")) { () =>
+ if (counter.incrementAndGet() < 3) {
+ failedSource("does not work")
+ } else Source.single(ByteString.fromString("ok"))
+ } { _ => true }
+ .runWith(Sink.head)
+ val result = Await.result(source, Duration.Inf)
+ assert(result.utf8String == "ok")
- assertThrows[ArithmeticException] {
- Await.result(source.runWith(Sink.ignore), Duration.Inf)
- }
-
- assert(counter.get() == 1)
+ assert(counter.get() == 3)
}
}
- private def withRetriesTest(originSource: Source[ByteString,
Any])(fallbackTo: Long => Source[ByteString, NotUsed])(
+ private def withRetriesTest(originSource: Source[ByteString,
Any])(fallbackTo: () => Source[ByteString, NotUsed])(
shouldRetry: Throwable => Boolean = { _ => true }): Source[ByteString,
NotUsed] =
originSource.recoverWithRetries(
-1,
{
case e: Throwable if shouldRetry(e) =>
- fallbackTo(0)
+ fallbackTo()
}
).mapMaterializedValue(_ => NotUsed)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 352bd64fa7..d02744c8d3 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -2169,6 +2169,7 @@ private[pekko] object TakeWithin {
override def onPull(): Unit = pull(in)
@nowarn("msg=Any")
+ @tailrec
def onFailure(ex: Throwable): Unit = {
import Collect.NotApplied
if (maximumRetries < 0 || attempt < maximumRetries) {
@@ -2180,10 +2181,10 @@ private[pekko] object TakeWithin {
TraversalBuilder.getValuePresentedSource(source) match {
case OptionVal.Some(graph) => graph match {
case singleSource: SingleSource[T @unchecked] => emit(out,
singleSource.elem, () => completeStage())
- case failed: FailedSource[T @unchecked] =>
failStage(failed.failure)
+ case failed: FailedSource[T @unchecked] =>
onFailure(failed.failure)
case futureSource: FutureSource[T @unchecked] =>
futureSource.future.value match {
case Some(Success(elem)) => emit(out, elem, () =>
completeStage())
- case Some(Failure(ex)) => failStage(ex)
+ case Some(Failure(ex)) => onFailure(ex)
case None =>
switchTo(source)
attempt += 1
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 88f40cee84..1622dad4c6 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -2078,6 +2078,8 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR
level automatically.
*
+ * It will keep trying to recover indefinitely, if you want to limit the
number of attempts, use `recoverWithRetries`.
+ *
* '''Emits when''' element is available from the upstream or upstream is
failed and element is available
* from alternative Source
*
@@ -2100,6 +2102,8 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR
level automatically.
*
+ * It will keep trying to recover indefinitely, if you want to limit the
number of attempts, use `recoverWithRetries`.
+ *
* '''Emits when''' element is available from the upstream or upstream is
failed and element is available
* from alternative Source
*
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 40c41647f0..cd045591a5 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2317,6 +2317,8 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR
level automatically.
*
+ * It will keep trying to recover indefinitely, if you want to limit the
number of attempts, use `recoverWithRetries`.
+ *
* '''Emits when''' element is available from the upstream or upstream is
failed and element is available
* from alternative Source
*
@@ -2339,6 +2341,8 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR
level automatically.
*
+ * It will keep trying to recover indefinitely, if you want to limit the
number of attempts, use `recoverWithRetries`.
+ *
* '''Emits when''' element is available from the upstream or upstream is
failed and element is available
* from alternative Source
*
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index fa3ee316b1..14f525bb70 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -1427,6 +1427,8 @@ final class SubFlow[In, Out, Mat](
*
* Throwing an exception inside ``recoverWith`` _will_ be logged on ERROR
level automatically.
*
+ * It will keep trying to recover indefinitely, if you want to limit the
number of attempts, use `recoverWithRetries`.
+ *
* '''Emits when''' element is available from the upstream or upstream is
failed and element is available
* from alternative Source
*
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 6324c447f9..7edb90afd9 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -1399,6 +1399,8 @@ final class SubSource[Out, Mat](
* Since the underlying failure signal onError arrives out-of-band, it might
jump over existing elements.
* This operator can recover the failure signal, but not the skipped
elements, which will be dropped.
*
+ * It will keep trying to recover indefinitely, if you want to limit the
number of attempts, use `recoverWithRetries`.
+ *
* '''Emits when''' element is available from the upstream or upstream is
failed and element is available
* from alternative Source
*
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 00c7cec924..49d0c8382e 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -906,6 +906,8 @@ trait FlowOps[+Out, +Mat] {
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR
level automatically.
*
+ * It will keep trying to recover indefinitely, if you want to limit the
number of attempts, use `recoverWithRetries`.
+ *
* '''Emits when''' element is available from the upstream or upstream is
failed and element is available
* from alternative Source
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]