This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch fixRecover in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 57a9853941e516dc601832a1a6ec64d96e315526 Author: He-Pin <[email protected]> AuthorDate: Thu Mar 5 01:42:42 2026 +0800 Revert "Revert recent changes to recoverWith (#2674)" This reverts commit c9906c60fcfac696cf307ca0edc8a80da202dde1. --- .../org/apache/pekko/stream/impl/fusing/Ops.scala | 38 ++++++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 0635e52d51..d02744c8d3 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -36,9 +36,16 @@ import pekko.stream.Attributes.{ InputBuffer, LogLevels } import pekko.stream.Attributes.SourceLocation import pekko.stream.OverflowStrategies._ import pekko.stream.Supervision.Decider -import pekko.stream.impl.{ Buffer => BufferImpl, ContextPropagation, ReactiveStreamsCompliance, TraversalBuilder } +import pekko.stream.impl.{ + Buffer => BufferImpl, + ContextPropagation, + FailedSource, + JavaStreamSource, + ReactiveStreamsCompliance, + TraversalBuilder +} import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SimpleLinearGraphStage, SingleSource } import pekko.stream.scaladsl.{ DelayStrategy, Source, @@ -2162,6 +2169,7 @@ private[pekko] object TakeWithin { override def onPull(): Unit = pull(in) @nowarn("msg=Any") + @tailrec def onFailure(ex: Throwable): Unit = { import Collect.NotApplied if (maximumRetries < 0 || attempt < maximumRetries) { @@ -2169,12 +2177,28 @@ private[pekko] object TakeWithin { case _: NotApplied.type => failStage(ex) case source: Graph[SourceShape[T] @unchecked, M @unchecked] if TraversalBuilder.isEmptySource(source) => completeStage() - case other: Graph[SourceShape[T] @unchecked, M @unchecked] => - TraversalBuilder.getSingleSource(other) match { - case OptionVal.Some(singleSource) => - emit(out, singleSource.elem.asInstanceOf[T], () => completeStage()) + case source: Graph[SourceShape[T] @unchecked, M @unchecked] => + TraversalBuilder.getValuePresentedSource(source) match { + case OptionVal.Some(graph) => graph match { + case singleSource: SingleSource[T @unchecked] => emit(out, singleSource.elem, () => completeStage()) + case failed: FailedSource[T @unchecked] => onFailure(failed.failure) + case futureSource: FutureSource[T @unchecked] => futureSource.future.value match { + case Some(Success(elem)) => emit(out, elem, () => completeStage()) + case Some(Failure(ex)) => onFailure(ex) + case None => + switchTo(source) + attempt += 1 + } + case iterableSource: IterableSource[T @unchecked] => + emitMultiple(out, iterableSource.elements, () => completeStage()) + case javaStreamSource: JavaStreamSource[T @unchecked, _] => + emitMultiple(out, javaStreamSource.open().spliterator(), () => completeStage()) + case _ => + switchTo(source) + attempt += 1 + } case _ => - switchTo(other) + switchTo(source) attempt += 1 } case _ => throw new IllegalStateException() // won't happen, compiler exhaustiveness check pleaser --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
