This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch fixRecover in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 9d465f02ebfa32828bc54c5e686fe45bcbd85488 Author: He-Pin <[email protected]> AuthorDate: Thu Mar 5 01:58:52 2026 +0800 fix: Fix recoverWith with add missing attempt back. --- .../stream/scaladsl/FlowRecoverWithSpec.scala | 396 ++++++++++++++++++++- .../org/apache/pekko/stream/impl/fusing/Ops.scala | 10 +- 2 files changed, 402 insertions(+), 4 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala index 47f129a329..a0cafc337c 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala @@ -14,7 +14,8 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.nowarn -import scala.concurrent.Future +import scala.concurrent.{ Future, Promise } +import scala.concurrent.duration._ import scala.util.control.NoStackTrace import org.apache.pekko @@ -251,6 +252,102 @@ class FlowRecoverWithSpec extends StreamSpec { .expectError(ex) } + "terminate with exception after set number of retries with failed source" in { + Source + .failed[Int](ex) + .recoverWithRetries(3, + { + case _: Throwable => Source.failed(ex) + }) + .runWith(TestSink[Int]()) + .request(1) + .expectError(ex) + } + + "terminate with exception after set number of retries with failed future source" in { + Source + .failed[Int](ex) + .recoverWithRetries(3, + { + case _: Throwable => Source.future(Future.failed(ex)) + }) + .runWith(TestSink[Int]()) + .request(1) + .expectError(ex) + } + + "count retries correctly with failed source" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWithRetries(5, + { + case _: Throwable => + if (counter.incrementAndGet() < 5) { + Source.failed(ex) + } else { + Source.single(42) + } + }) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(42) + .expectComplete() + counter.get() shouldBe 5 + } + + "count retries correctly with failed future source" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWithRetries(5, + { + case _: Throwable => + if (counter.incrementAndGet() < 5) { + Source.future(Future.failed(ex)) + } else { + Source.single(42) + } + }) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(42) + .expectComplete() + counter.get() shouldBe 5 + } + + "exhaust retries with failed source and then fail" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWithRetries(3, + { + case _: Throwable => + counter.incrementAndGet() + Source.failed(ex) + }) + .runWith(TestSink[Int]()) + .request(1) + .expectError(ex) + counter.get() shouldBe 3 + } + + "exhaust retries with failed future source and then fail" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWithRetries(3, + { + case _: Throwable => + counter.incrementAndGet() + Source.future(Future.failed(ex)) + }) + .runWith(TestSink[Int]()) + .request(1) + .expectError(ex) + counter.get() shouldBe 3 + } + "not attempt recovering when attempts is zero" in { Source(1 to 3) .map { a => @@ -310,5 +407,302 @@ class FlowRecoverWithSpec extends StreamSpec { result.failed.futureValue should ===(matFail) } + + "fail when failed source carries an exception not matched by the partial function" in { + val ex2 = new IllegalArgumentException("ex2") with NoStackTrace + Source + .failed[Int](ex) + .recoverWithRetries(3, + { + case _: RuntimeException => Source.failed(ex2) + }) + .runWith(TestSink[Int]()) + .request(1) + .expectError(ex2) + } + + "allow exactly one retry with recoverWithRetries(1, ...)" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWithRetries(1, + { + case _: Throwable => + counter.incrementAndGet() + Source.single(42) + }) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(42) + .expectComplete() + counter.get() shouldBe 1 + } + + "fail after exactly one retry with failed source and recoverWithRetries(1, ...)" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWithRetries(1, + { + case _: Throwable => + counter.incrementAndGet() + Source.failed(ex) + }) + .runWith(TestSink[Int]()) + .request(1) + .expectError(ex) + counter.get() shouldBe 1 + } + + "recover with iterable source from failed source" in { + Source + .failed[Int](ex) + .recoverWith { case _: Throwable => Source(List(1, 2, 3)) } + .runWith(TestSink[Int]()) + .request(4) + .expectNextN(List(1, 2, 3)) + .expectComplete() + } + + "terminate after set number of retries with iterable source" in { + Source(1 to 3) + .map { a => + if (a == 3) throw ex else a + } + .recoverWithRetries(2, { case _: Throwable => Source(List(11, 22, 33)).map(m => if (m == 33) throw ex else m) }) + .runWith(TestSink[Int]()) + .request(100) + .expectNextN(List(1, 2)) + .expectNextN(List(11, 22)) + .expectNextN(List(11, 22)) + .expectError(ex) + } + + "recover with a pending future source" in { + val promise = Promise[Int]() + val probe = Source + .failed[Int](ex) + .recoverWith { case _: Throwable => Source.future(promise.future) } + .runWith(TestSink[Int]()) + probe.request(1) + probe.expectNoMessage(200.millis) + promise.success(42) + probe + .expectNext(42) + .expectComplete() + } + + "recover infinitely with failed source when negative (-1) number of attempts given" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWithRetries(-1, + { + case _: Throwable => + if (counter.incrementAndGet() < 100) { + Source.failed(ex) + } else { + Source.single(42) + } + }) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(42) + .expectComplete() + counter.get() shouldBe 100 + } + + "recover infinitely with failed future source when negative (-1) number of attempts given" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWithRetries(-1, + { + case _: Throwable => + if (counter.incrementAndGet() < 100) { + Source.future(Future.failed(ex)) + } else { + Source.single(42) + } + }) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(42) + .expectComplete() + counter.get() shouldBe 100 + } + + "count retries correctly with mixed failed source and failed future source" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWithRetries(5, + { + case _: Throwable => + val count = counter.incrementAndGet() + if (count < 5) { + if (count % 2 == 1) Source.failed(ex) + else Source.future(Future.failed(ex)) + } else { + Source.single(42) + } + }) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(42) + .expectComplete() + counter.get() shouldBe 5 + } + + "recover with failed source then iterable source" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWith { + case _: Throwable => + if (counter.incrementAndGet() < 3) { + Source.failed(ex) + } else { + Source(List(10, 20, 30)) + } + } + .runWith(TestSink[Int]()) + .request(4) + .expectNextN(List(10, 20, 30)) + .expectComplete() + counter.get() shouldBe 3 + } + + "recover with failed source then java stream source" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWith { + case _: Throwable => + if (counter.incrementAndGet() < 3) { + Source.failed(ex) + } else { + Source.fromJavaStream(() => java.util.stream.Stream.of(10, 20)) + } + } + .runWith(TestSink[Int]()) + .request(3) + .expectNextN(List(10, 20)) + .expectComplete() + counter.get() shouldBe 3 + } + + "recover with failed source then empty source" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWith { + case _: Throwable => + if (counter.incrementAndGet() < 3) { + Source.failed(ex) + } else { + Source.empty + } + } + .runWith(TestSink[Int]()) + .request(1) + .expectComplete() + counter.get() shouldBe 3 + } + + "recover with failed source then single source" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWith { + case _: Throwable => + if (counter.incrementAndGet() < 3) { + Source.failed(ex) + } else { + Source.single(99) + } + } + .runWith(TestSink[Int]()) + .request(1) + .expectNext(99) + .expectComplete() + counter.get() shouldBe 3 + } + + "recover with failed source then completed future source" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWith { + case _: Throwable => + if (counter.incrementAndGet() < 3) { + Source.failed(ex) + } else { + Source.future(Future.successful(77)) + } + } + .runWith(TestSink[Int]()) + .request(1) + .expectNext(77) + .expectComplete() + counter.get() shouldBe 3 + } + + "recover with failed source carrying different exception types" in { + val ex2 = new IllegalStateException("ex2") with NoStackTrace + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .recoverWithRetries(5, + { + case _: RuntimeException => + if (counter.incrementAndGet() < 3) { + Source.failed(ex2) + } else { + Source.single(42) + } + }) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(42) + .expectComplete() + counter.get() shouldBe 3 + } + + "recover on a Flow" in { + Source(1 to 4) + .via( + Flow[Int] + .map { a => + if (a == 3) throw ex else a + } + .recoverWith { case _: Throwable => Source.single(99) }) + .runWith(TestSink[Int]()) + .request(3) + .expectNextN(List(1, 2, 99)) + .expectComplete() + } + + "recover on a Flow with failed source retries" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source + .failed[Int](ex) + .via( + Flow[Int] + .recoverWithRetries(5, + { + case _: Throwable => + if (counter.incrementAndGet() < 3) { + Source.failed(ex) + } else { + Source.single(42) + } + })) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(42) + .expectComplete() + counter.get() shouldBe 3 + } } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index d02744c8d3..26a4a61454 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -2181,11 +2181,15 @@ private[pekko] object TakeWithin { TraversalBuilder.getValuePresentedSource(source) match { case OptionVal.Some(graph) => graph match { case singleSource: SingleSource[T @unchecked] => emit(out, singleSource.elem, () => completeStage()) - case failed: FailedSource[T @unchecked] => onFailure(failed.failure) + case failed: FailedSource[T @unchecked] => + attempt += 1 + onFailure(failed.failure) case futureSource: FutureSource[T @unchecked] => futureSource.future.value match { case Some(Success(elem)) => emit(out, elem, () => completeStage()) - case Some(Failure(ex)) => onFailure(ex) - case None => + case Some(Failure(ex)) => + attempt += 1 + onFailure(ex) + case None => switchTo(source) attempt += 1 } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
