This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch optimize-restartflow-delay-cancellation in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 40187de79f3e7382deb6980378ff1eff250ebf1d Author: He-Pin <[email protected]> AuthorDate: Sat May 16 18:16:31 2026 +0800 optimize: remove unnecessary DelayCancellation stage from RestartFlow Motivation: RestartFlow had ~3x less throughput than RetryFlow due to an unnecessary DelayCancellation stage in the inner flow graph. This stage added per-element overhead (extra grab/push/pull operations) even when not needed. Modification: Only apply the DelayCancellation stage when onlyOnFailures mode is enabled. For the common RestartFlow.withBackoff case, the stage is removed. Result: Significantly improved throughput for RestartFlow.withBackoff. The onFailuresWithBackoff path retains the DelayCancellation stage for correctness (race condition fix for #23909). Tests: - All 38 existing RestartSpec tests pass Refs: https://github.com/akka/akka-core/issues/31225 --- .../org/apache/pekko/stream/scaladsl/RestartFlow.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala index 65f2ff9a4c..088873717f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala @@ -106,10 +106,18 @@ private final class RestartWithBackoffFlow[In, Out]( val sourceOut: SubSourceOutlet[In] = createSubOutlet(in) val sinkIn: SubSinkInlet[Out] = createSubInlet(out) - val graph = Source - .fromGraph(sourceOut.source) - // Temp fix while waiting cause of cancellation. See #23909 - .via(RestartWithBackoffFlow.delayCancellation[In](delay)) + val sourceWithCancellation = + if (onlyOnFailures) { + // Delay cancellation to handle race condition where cancellation arrives + // before failure signal in onlyOnFailures mode. See #23909 + Source + .fromGraph(sourceOut.source) + .via(RestartWithBackoffFlow.delayCancellation[In](delay)) + } else { + Source.fromGraph(sourceOut.source) + } + + val graph = sourceWithCancellation .via(flowFactory()) .to(sinkIn.sink) subFusingMaterializer.materialize(graph, inheritedAttributes) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
