This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 2d5c50c38f chore: optmize recoverWith to avoid some materialization
(#1775)
2d5c50c38f is described below
commit 2d5c50c38f15823dcce7da11bf13196105b3ab84
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri Feb 28 20:19:33 2025 +0800
chore: optmize recoverWith to avoid some materialization (#1775)
---
.../stream/scaladsl/FlowRecoverWithSpec.scala | 27 +++++++++++++++
.../org/apache/pekko/stream/impl/fusing/Ops.scala | 38 +++++++++++++++++-----
2 files changed, 57 insertions(+), 8 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
index 022ac54bc0..07933c0378 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
@@ -14,6 +14,7 @@
package org.apache.pekko.stream.scaladsl
import scala.annotation.nowarn
+import scala.concurrent.Future
import scala.util.control.NoStackTrace
import org.apache.pekko
@@ -62,6 +63,32 @@ class FlowRecoverWithSpec extends StreamSpec {
.expectComplete()
}
+ "recover with a completed future source" in {
+ Source.failed(ex)
+ .recoverWith { case _: Throwable =>
Source.future(Future.successful(3)) }
+ .runWith(TestSink[Int]())
+ .request(1)
+ .expectNext(3)
+ .expectComplete()
+ }
+
+ "recover with a failed future source" in {
+ Source.failed(ex)
+ .recoverWith { case _: Throwable => Source.future(Future.failed(ex)) }
+ .runWith(TestSink[Int]())
+ .request(1)
+ .expectError(ex)
+ }
+
+ "recover with a java stream source" in {
+ Source.failed(ex)
+ .recoverWith { case _: Throwable => Source.fromJavaStream(() =>
java.util.stream.Stream.of(1, 2, 3)) }
+ .runWith(TestSink[Int]())
+ .request(4)
+ .expectNextN(1 to 3)
+ .expectComplete()
+ }
+
"recover with single source" in {
Source(1 to 4)
.map { a =>
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 737116fd22..c4607d6429 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -24,7 +24,6 @@ import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.util.{ Failure, Success, Try }
import scala.util.control.{ NoStackTrace, NonFatal }
import scala.util.control.Exception.Catcher
-
import org.apache.pekko
import pekko.actor.{ ActorRef, Terminated }
import pekko.annotation.InternalApi
@@ -36,9 +35,16 @@ import pekko.stream.Attributes.{ InputBuffer, LogLevels }
import pekko.stream.Attributes.SourceLocation
import pekko.stream.OverflowStrategies._
import pekko.stream.Supervision.Decider
-import pekko.stream.impl.{ Buffer => BufferImpl, ContextPropagation,
ReactiveStreamsCompliance, TraversalBuilder }
+import pekko.stream.impl.{
+ Buffer => BufferImpl,
+ ContextPropagation,
+ FailedSource,
+ JavaStreamSource,
+ ReactiveStreamsCompliance,
+ TraversalBuilder
+}
import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource,
SimpleLinearGraphStage, SingleSource }
import pekko.stream.scaladsl.{ DelayStrategy, Source }
import pekko.stream.stage._
import pekko.util.{ unused, ConstantFun, OptionVal }
@@ -2173,12 +2179,28 @@ private[pekko] object TakeWithin {
case _: NotApplied.type => failStage(ex)
case source: Graph[SourceShape[T] @unchecked, M @unchecked] if
TraversalBuilder.isEmptySource(source) =>
completeStage()
- case other: Graph[SourceShape[T] @unchecked, M @unchecked] =>
- TraversalBuilder.getSingleSource(other) match {
- case OptionVal.Some(singleSource) =>
- emit(out, singleSource.elem.asInstanceOf[T], () =>
completeStage())
+ case source: Graph[SourceShape[T] @unchecked, M @unchecked] =>
+ TraversalBuilder.getValuePresentedSource(source) match {
+ case OptionVal.Some(graph) => graph match {
+ case singleSource: SingleSource[T @unchecked] => emit(out,
singleSource.elem, () => completeStage())
+ case failed: FailedSource[T @unchecked] =>
failStage(failed.failure)
+ case futureSource: FutureSource[T @unchecked] =>
futureSource.future.value match {
+ case Some(Success(elem)) => emit(out, elem, () =>
completeStage())
+ case Some(Failure(ex)) => failStage(ex)
+ case None =>
+ switchTo(source)
+ attempt += 1
+ }
+ case iterableSource: IterableSource[T @unchecked] =>
+ emitMultiple(out, iterableSource.elements, () =>
completeStage())
+ case javaStreamSource: JavaStreamSource[T @unchecked, _] =>
+ emitMultiple(out, javaStreamSource.open().spliterator(),
() => completeStage())
+ case _ =>
+ switchTo(source)
+ attempt += 1
+ }
case _ =>
- switchTo(other)
+ switchTo(source)
attempt += 1
}
case _ => throw new IllegalStateException() // won't happen,
compiler exhaustiveness check pleaser
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]