This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new ccce5c0426 feat: Optimize recoverWith stream operator for single 
source.
ccce5c0426 is described below

commit ccce5c0426a1dc357a5c06ffd80759592ae4366a
Author: He-Pin <[email protected]>
AuthorDate: Wed Jan 10 00:50:27 2024 +0800

    feat: Optimize recoverWith stream operator for single source.
---
 .../apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala | 14 ++++++++++++++
 .../scala/org/apache/pekko/stream/impl/fusing/Ops.scala    |  9 +++++++--
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
index 831c1e3d07..022ac54bc0 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
@@ -62,6 +62,20 @@ class FlowRecoverWithSpec extends StreamSpec {
         .expectComplete()
     }
 
+    "recover with single source" in {
+      Source(1 to 4)
+        .map { a =>
+          if (a == 3) throw ex else a
+        }
+        .recoverWith { case _: Throwable => Source.single(3) }
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNextN(1 to 2)
+        .request(1)
+        .expectNext(3)
+        .expectComplete()
+    }
+
     "cancel substream if parent is terminated when there is a handler" in {
       Source(1 to 4)
         .map { a =>
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 8acc1ca7c3..cd8e26de59 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -2174,8 +2174,13 @@ private[pekko] object TakeWithin {
             case source: Graph[SourceShape[T] @unchecked, M @unchecked] if 
TraversalBuilder.isEmptySource(source) =>
               completeStage()
             case other: Graph[SourceShape[T] @unchecked, M @unchecked] =>
-              switchTo(other)
-              attempt += 1
+              TraversalBuilder.getSingleSource(other) match {
+                case OptionVal.Some(singleSource) =>
+                  emit(out, singleSource.elem.asInstanceOf[T], () => 
completeStage())
+                case _ =>
+                  switchTo(other)
+                  attempt += 1
+              }
             case _ => throw new IllegalStateException() // won't happen, 
compiler exhaustiveness check pleaser
           }
         } else


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to