This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new ccce5c0426 feat: Optimize recoverWith stream operator for single
source.
ccce5c0426 is described below
commit ccce5c0426a1dc357a5c06ffd80759592ae4366a
Author: He-Pin <[email protected]>
AuthorDate: Wed Jan 10 00:50:27 2024 +0800
feat: Optimize recoverWith stream operator for single source.
---
.../apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala | 14 ++++++++++++++
.../scala/org/apache/pekko/stream/impl/fusing/Ops.scala | 9 +++++++--
2 files changed, 21 insertions(+), 2 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
index 831c1e3d07..022ac54bc0 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
@@ -62,6 +62,20 @@ class FlowRecoverWithSpec extends StreamSpec {
.expectComplete()
}
+ "recover with single source" in {
+ Source(1 to 4)
+ .map { a =>
+ if (a == 3) throw ex else a
+ }
+ .recoverWith { case _: Throwable => Source.single(3) }
+ .runWith(TestSink[Int]())
+ .request(2)
+ .expectNextN(1 to 2)
+ .request(1)
+ .expectNext(3)
+ .expectComplete()
+ }
+
"cancel substream if parent is terminated when there is a handler" in {
Source(1 to 4)
.map { a =>
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 8acc1ca7c3..cd8e26de59 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -2174,8 +2174,13 @@ private[pekko] object TakeWithin {
case source: Graph[SourceShape[T] @unchecked, M @unchecked] if
TraversalBuilder.isEmptySource(source) =>
completeStage()
case other: Graph[SourceShape[T] @unchecked, M @unchecked] =>
- switchTo(other)
- attempt += 1
+ TraversalBuilder.getSingleSource(other) match {
+ case OptionVal.Some(singleSource) =>
+ emit(out, singleSource.elem.asInstanceOf[T], () =>
completeStage())
+ case _ =>
+ switchTo(other)
+ attempt += 1
+ }
case _ => throw new IllegalStateException() // won't happen,
compiler exhaustiveness check pleaser
}
} else
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]