This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 3a1d31e841 =str Avoid subMaterialization when the provided recover
source is empty.
3a1d31e841 is described below
commit 3a1d31e841dd6f48e120bad597c38e87fc452b30
Author: He-Pin <[email protected]>
AuthorDate: Sat Dec 23 14:52:40 2023 +0800
=str Avoid subMaterialization when the provided recover source is empty.
---
.../pekko/stream/scaladsl/FlowRecoverWithSpec.scala | 13 +++++++++++++
.../apache/pekko/stream/impl/TraversalBuilder.scala | 10 ++++++++++
.../org/apache/pekko/stream/impl/fusing/Ops.scala | 19 ++++++++++++++-----
3 files changed, 37 insertions(+), 5 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
index bc6c2b5ef5..831c1e3d07 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
@@ -49,6 +49,19 @@ class FlowRecoverWithSpec extends StreamSpec {
.expectComplete()
}
+ "recover with empty source" in {
+ Source(1 to 4)
+ .map { a =>
+ if (a == 3) throw ex else a
+ }
+ .recoverWith { case _: Throwable => Source.empty }
+ .runWith(TestSink[Int]())
+ .request(2)
+ .expectNextN(1 to 2)
+ .request(1)
+ .expectComplete()
+ }
+
"cancel substream if parent is terminated when there is a handler" in {
Source(1 to 4)
.map { a =>
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
index 8e2c6fc3d3..7ff61a2b3a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
@@ -379,6 +379,16 @@ import pekko.util.unused
}
}
}
+
+ /**
+ * Test if a Graph is an empty Source.
+ */
+ def isEmptySource(graph: Graph[SourceShape[_], _]): Boolean = graph match {
+ case source: scaladsl.Source[_, _] if source eq scaladsl.Source.empty =>
true
+ case source: javadsl.Source[_, _] if source eq javadsl.Source.empty() =>
true
+ case _ =>
false
+ }
+
}
/**
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 5c9f45689e..9d1ce1a0b2 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -36,7 +36,7 @@ import pekko.stream.Attributes.{ InputBuffer, LogLevels }
import pekko.stream.Attributes.SourceLocation
import pekko.stream.OverflowStrategies._
import pekko.stream.Supervision.Decider
-import pekko.stream.impl.{ Buffer => BufferImpl, ContextPropagation,
ReactiveStreamsCompliance }
+import pekko.stream.impl.{ Buffer => BufferImpl, ContextPropagation,
ReactiveStreamsCompliance, TraversalBuilder }
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import pekko.stream.scaladsl.{ DelayStrategy, Source }
@@ -2163,12 +2163,21 @@ private[pekko] object TakeWithin {
override def onPull(): Unit = pull(in)
- def onFailure(ex: Throwable): Unit =
- if ((maximumRetries < 0 || attempt < maximumRetries) &&
pf.isDefinedAt(ex)) {
- switchTo(pf(ex))
- attempt += 1
+ def onFailure(ex: Throwable): Unit = {
+ import Collect.NotApplied
+ if (maximumRetries < 0 || attempt < maximumRetries) {
+ pf.applyOrElse(ex, NotApplied) match {
+ case NotApplied => failStage(ex)
+ case source: Graph[SourceShape[T] @unchecked, M @unchecked] if
TraversalBuilder.isEmptySource(source) =>
+ completeStage()
+ case other: Graph[SourceShape[T] @unchecked, M @unchecked] =>
+ switchTo(other)
+ attempt += 1
+ case _ => throw new IllegalStateException() // won't happen,
compiler exhaustiveness check pleaser
+ }
} else
failStage(ex)
+ }
def switchTo(source: Graph[SourceShape[T], M]): Unit = {
val sinkIn = new SubSinkInlet[T]("RecoverWithSink") with InHandler
with OutHandler { self =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]