This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a1d31e841 =str Avoid subMaterialization when the provided recover 
source is empty.
3a1d31e841 is described below

commit 3a1d31e841dd6f48e120bad597c38e87fc452b30
Author: He-Pin <[email protected]>
AuthorDate: Sat Dec 23 14:52:40 2023 +0800

    =str Avoid subMaterialization when the provided recover source is empty.
---
 .../pekko/stream/scaladsl/FlowRecoverWithSpec.scala   | 13 +++++++++++++
 .../apache/pekko/stream/impl/TraversalBuilder.scala   | 10 ++++++++++
 .../org/apache/pekko/stream/impl/fusing/Ops.scala     | 19 ++++++++++++++-----
 3 files changed, 37 insertions(+), 5 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
index bc6c2b5ef5..831c1e3d07 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
@@ -49,6 +49,19 @@ class FlowRecoverWithSpec extends StreamSpec {
         .expectComplete()
     }
 
+    "recover with empty source" in {
+      Source(1 to 4)
+        .map { a =>
+          if (a == 3) throw ex else a
+        }
+        .recoverWith { case _: Throwable => Source.empty }
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNextN(1 to 2)
+        .request(1)
+        .expectComplete()
+    }
+
     "cancel substream if parent is terminated when there is a handler" in {
       Source(1 to 4)
         .map { a =>
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
index 8e2c6fc3d3..7ff61a2b3a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
@@ -379,6 +379,16 @@ import pekko.util.unused
         }
     }
   }
+
+  /**
+   * Test if a Graph is an empty Source.
+   */
+  def isEmptySource(graph: Graph[SourceShape[_], _]): Boolean = graph match {
+    case source: scaladsl.Source[_, _] if source eq scaladsl.Source.empty => 
true
+    case source: javadsl.Source[_, _] if source eq javadsl.Source.empty() => 
true
+    case _                                                                => 
false
+  }
+
 }
 
 /**
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 5c9f45689e..9d1ce1a0b2 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -36,7 +36,7 @@ import pekko.stream.Attributes.{ InputBuffer, LogLevels }
 import pekko.stream.Attributes.SourceLocation
 import pekko.stream.OverflowStrategies._
 import pekko.stream.Supervision.Decider
-import pekko.stream.impl.{ Buffer => BufferImpl, ContextPropagation, 
ReactiveStreamsCompliance }
+import pekko.stream.impl.{ Buffer => BufferImpl, ContextPropagation, 
ReactiveStreamsCompliance, TraversalBuilder }
 import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
 import pekko.stream.scaladsl.{ DelayStrategy, Source }
@@ -2163,12 +2163,21 @@ private[pekko] object TakeWithin {
 
       override def onPull(): Unit = pull(in)
 
-      def onFailure(ex: Throwable): Unit =
-        if ((maximumRetries < 0 || attempt < maximumRetries) && 
pf.isDefinedAt(ex)) {
-          switchTo(pf(ex))
-          attempt += 1
+      def onFailure(ex: Throwable): Unit = {
+        import Collect.NotApplied
+        if (maximumRetries < 0 || attempt < maximumRetries) {
+          pf.applyOrElse(ex, NotApplied) match {
+            case NotApplied => failStage(ex)
+            case source: Graph[SourceShape[T] @unchecked, M @unchecked] if 
TraversalBuilder.isEmptySource(source) =>
+              completeStage()
+            case other: Graph[SourceShape[T] @unchecked, M @unchecked] =>
+              switchTo(other)
+              attempt += 1
+            case _ => throw new IllegalStateException() // won't happen, 
compiler exhaustiveness check pleaser
+          }
         } else
           failStage(ex)
+      }
 
       def switchTo(source: Graph[SourceShape[T], M]): Unit = {
         val sinkIn = new SubSinkInlet[T]("RecoverWithSink") with InHandler 
with OutHandler { self =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to