This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new fe8a81fb1d fix: complete MapAsyncPartitioned ordered stage after
trailing resumed failures (#3009)
fe8a81fb1d is described below
commit fe8a81fb1d802e0442e23c920f76ab6d8c585491
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri May 29 22:22:45 2026 +0800
fix: complete MapAsyncPartitioned ordered stage after trailing resumed
failures (#3009)
* fix: complete MapAsyncPartitioned ordered stage after trailing resumed
failures
Motivation:
FlowMapAsyncPartitionedSpec "resume after multiple failures if resume
supervision is in place" intermittently hung for 60s and failed with a
ScalaFutures timeout. This is the same completion-path family as the
drainQueue race fixed in #2899 (see #2903).
Modification:
pushNextIfPossibleOrdered's non-empty-partitions branch ended with only
drainQueue(), never calling pullIfNeeded(). The while-loop's Failure
(Supervision.Resume) branch dequeues elements without a completion check
(only the Success branch calls pullIfNeeded). So when the final buffered
elements are resumed failures, the buffer empties with upstream already
closed but completeStage() is never invoked, hanging the stage. Added a
trailing pullIfNeeded() after drainQueue(), mirroring the unordered
branch which already had it.
Result:
The stage now completes once the buffer drains after upstream finish,
regardless of whether the last elements succeed or are resumed failures.
The previously timing-out test passes in ~1ms; all 18 specs green across
5 consecutive runs. The added call is O(1), allocation-free and outside
the per-element loop, so there is no hot-path or JIT impact.
References:
- https://github.com/apache/pekko/issues/2903
- #2899
* test: cover unordered resume after trailing failures in
MapAsyncPartitioned
Motivation:
The ordered completion-path hang is covered by the existing "resume after
multiple failures" test. The unordered branch already calls pullIfNeeded()
but had no resume-with-trailing-failures coverage, leaving the symmetric
completion path unguarded against future regressions.
Modification:
Added "resume (unordered) after multiple failures if resume supervision is
in place" mirroring the ordered scenario (final elements 8/9/10 all fail),
asserting the stage completes and emits the surviving elements.
Result:
FlowMapAsyncPartitionedSpec runs 19 specs green; the new test completes in
~1ms, confirming the unordered path drains and completes after upstream
finish.
References:
- https://github.com/apache/pekko/issues/2903
---
.../scaladsl/FlowMapAsyncPartitionedSpec.scala | 20 ++++++++++++++++++++
.../apache/pekko/stream/MapAsyncPartitioned.scala | 5 +++++
2 files changed, 25 insertions(+)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
index b8d9dd6d83..8b3635050d 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
@@ -398,6 +398,26 @@ class FlowMapAsyncPartitionedSpec extends StreamSpec with
WithLogCapturing {
expected.futureValue should contain theSameElementsInOrderAs (1 to
10).filter(_ % 4 == 3)
}
+ "resume (unordered) after multiple failures if resume supervision is in
place" in {
+ implicit val ec: ExecutionContext = system.dispatcher
+
+ // Mirrors the ordered case: the final elements (8, 9, 10) all fail
under the resuming
+ // decider, so the stage must still complete once the buffer drains
after upstream finish.
+ // Guards the unordered completion path against the regression fixed for
the ordered path.
+ val expected =
+ Source(1 to 10)
+ .mapAsyncPartitionedUnordered(4)(_ % 3) { (elem, _) =>
+ Future {
+ if (elem % 4 < 3) throw new TE("BOOM!")
+ else elem
+ }
+ }
+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
+ .runWith(Sink.seq)
+
+ expected.futureValue should contain theSameElementsAs (1 to 10).filter(_
% 4 == 3)
+ }
+
"ignore null-completed futures" in {
val shouldBeNull = {
val n = scala.util.Random.nextInt(10) + 1
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
index 24f2d77c1b..565d8a2b51 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
@@ -209,6 +209,11 @@ private[stream] final class MapAsyncPartitioned[In, Out,
Partition](
}
}
drainQueue()
+ // The while-loop's Failure (resume) branch dequeues elements
without calling pullIfNeeded(),
+ // so when the final buffered elements are resumed failures the
buffer empties here without any
+ // completion check. pullIfNeeded() is the only path that runs
completeStage() once upstream has
+ // finished, so it must run after draining or the stage hangs
forever (mirrors the unordered branch).
+ pullIfNeeded()
}
private def pushNextIfPossibleUnordered(): Unit =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]