This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.5.x by this push:
new c6967c4e31 Revert recent changes to recoverWith (#2674) (#2675)
c6967c4e31 is described below
commit c6967c4e31e818d54853c25defb2683f02e801e4
Author: PJ Fanning <[email protected]>
AuthorDate: Thu Feb 26 19:54:17 2026 +0100
Revert recent changes to recoverWith (#2674) (#2675)
* revert recent changes to recoverWith
Update Ops.scala
revert recent changes to recoverWith
* scalafmt
---
.../org/apache/pekko/stream/impl/fusing/Ops.scala | 38 ++++------------------
1 file changed, 7 insertions(+), 31 deletions(-)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index c7c5b852ce..91bdb6110f 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -35,16 +35,9 @@ import pekko.stream.Attributes.{ InputBuffer, LogLevels }
import pekko.stream.Attributes.SourceLocation
import pekko.stream.OverflowStrategies._
import pekko.stream.Supervision.Decider
-import pekko.stream.impl.{
- Buffer => BufferImpl,
- ContextPropagation,
- FailedSource,
- JavaStreamSource,
- ReactiveStreamsCompliance,
- TraversalBuilder
-}
+import pekko.stream.impl.{ Buffer => BufferImpl, ContextPropagation,
ReactiveStreamsCompliance, TraversalBuilder }
import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource,
SimpleLinearGraphStage, SingleSource }
+import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import pekko.stream.scaladsl.{
DelayStrategy,
Source,
@@ -2183,7 +2176,6 @@ private[pekko] object TakeWithin {
override def onPull(): Unit = pull(in)
@nowarn("msg=Any")
- @tailrec
def onFailure(ex: Throwable): Unit = {
import Collect.NotApplied
if (maximumRetries < 0 || attempt < maximumRetries) {
@@ -2191,28 +2183,12 @@ private[pekko] object TakeWithin {
case _: NotApplied.type
=> failStage(ex)
case source: Graph[SourceShape[T] @unchecked, M @unchecked] if
TraversalBuilder.isEmptySource(source) =>
completeStage()
- case source: Graph[SourceShape[T] @unchecked, M @unchecked] =>
- TraversalBuilder.getValuePresentedSource(source) match {
- case OptionVal.Some(graph) => graph match {
- case singleSource: SingleSource[T @unchecked] => emit(out,
singleSource.elem, () => completeStage())
- case failed: FailedSource[T @unchecked] =>
onFailure(failed.failure)
- case futureSource: FutureSource[T @unchecked] =>
futureSource.future.value match {
- case Some(Success(elem)) => emit(out, elem, () =>
completeStage())
- case Some(Failure(ex)) => onFailure(ex)
- case None =>
- switchTo(source)
- attempt += 1
- }
- case iterableSource: IterableSource[T @unchecked] =>
- emitMultiple(out, iterableSource.elements, () =>
completeStage())
- case javaStreamSource: JavaStreamSource[T @unchecked, _] =>
- emitMultiple(out, javaStreamSource.open().spliterator(),
() => completeStage())
- case _ =>
- switchTo(source)
- attempt += 1
- }
+ case other: Graph[SourceShape[T] @unchecked, M @unchecked] =>
+ TraversalBuilder.getSingleSource(other) match {
+ case OptionVal.Some(singleSource) =>
+ emit(out, singleSource.elem.asInstanceOf[T], () =>
completeStage())
case _ =>
- switchTo(source)
+ switchTo(other)
attempt += 1
}
case _ => throw new IllegalStateException() // won't happen,
compiler exhaustiveness check pleaser
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]