This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new fe8a81fb1d fix: complete MapAsyncPartitioned ordered stage after 
trailing resumed failures (#3009)
fe8a81fb1d is described below

commit fe8a81fb1d802e0442e23c920f76ab6d8c585491
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri May 29 22:22:45 2026 +0800

    fix: complete MapAsyncPartitioned ordered stage after trailing resumed 
failures (#3009)
    
    * fix: complete MapAsyncPartitioned ordered stage after trailing resumed 
failures
    
    Motivation:
    FlowMapAsyncPartitionedSpec "resume after multiple failures if resume
    supervision is in place" intermittently hung for 60s and failed with a
    ScalaFutures timeout. This is the same completion-path family as the
    drainQueue race fixed in #2899 (see #2903).
    
    Modification:
    pushNextIfPossibleOrdered's non-empty-partitions branch ended with only
    drainQueue(), never calling pullIfNeeded(). The while-loop's Failure
    (Supervision.Resume) branch dequeues elements without a completion check
    (only the Success branch calls pullIfNeeded). So when the final buffered
    elements are resumed failures, the buffer empties with upstream already
    closed but completeStage() is never invoked, hanging the stage. Added a
    trailing pullIfNeeded() after drainQueue(), mirroring the unordered
    branch which already had it.
    
    Result:
    The stage now completes once the buffer drains after upstream finish,
    regardless of whether the last elements succeed or are resumed failures.
    The previously timing-out test passes in ~1ms; all 18 specs green across
    5 consecutive runs. The added call is O(1), allocation-free and outside
    the per-element loop, so there is no hot-path or JIT impact.
    
    References:
    - https://github.com/apache/pekko/issues/2903
    - #2899
    
    * test: cover unordered resume after trailing failures in 
MapAsyncPartitioned
    
    Motivation:
    The ordered completion-path hang is covered by the existing "resume after
    multiple failures" test. The unordered branch already calls pullIfNeeded()
    but had no resume-with-trailing-failures coverage, leaving the symmetric
    completion path unguarded against future regressions.
    
    Modification:
    Added "resume (unordered) after multiple failures if resume supervision is
    in place" mirroring the ordered scenario (final elements 8/9/10 all fail),
    asserting the stage completes and emits the surviving elements.
    
    Result:
    FlowMapAsyncPartitionedSpec runs 19 specs green; the new test completes in
    ~1ms, confirming the unordered path drains and completes after upstream
    finish.
    
    References:
    - https://github.com/apache/pekko/issues/2903
---
 .../scaladsl/FlowMapAsyncPartitionedSpec.scala       | 20 ++++++++++++++++++++
 .../apache/pekko/stream/MapAsyncPartitioned.scala    |  5 +++++
 2 files changed, 25 insertions(+)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
index b8d9dd6d83..8b3635050d 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
@@ -398,6 +398,26 @@ class FlowMapAsyncPartitionedSpec extends StreamSpec with 
WithLogCapturing {
       expected.futureValue should contain theSameElementsInOrderAs (1 to 
10).filter(_ % 4 == 3)
     }
 
+    "resume (unordered) after multiple failures if resume supervision is in 
place" in {
+      implicit val ec: ExecutionContext = system.dispatcher
+
+      // Mirrors the ordered case: the final elements (8, 9, 10) all fail 
under the resuming
+      // decider, so the stage must still complete once the buffer drains 
after upstream finish.
+      // Guards the unordered completion path against the regression fixed for 
the ordered path.
+      val expected =
+        Source(1 to 10)
+          .mapAsyncPartitionedUnordered(4)(_ % 3) { (elem, _) =>
+            Future {
+              if (elem % 4 < 3) throw new TE("BOOM!")
+              else elem
+            }
+          }
+          
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
+          .runWith(Sink.seq)
+
+      expected.futureValue should contain theSameElementsAs (1 to 10).filter(_ 
% 4 == 3)
+    }
+
     "ignore null-completed futures" in {
       val shouldBeNull = {
         val n = scala.util.Random.nextInt(10) + 1
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala 
b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
index 24f2d77c1b..565d8a2b51 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
@@ -209,6 +209,11 @@ private[stream] final class MapAsyncPartitioned[In, Out, 
Partition](
             }
           }
           drainQueue()
+          // The while-loop's Failure (resume) branch dequeues elements 
without calling pullIfNeeded(),
+          // so when the final buffered elements are resumed failures the 
buffer empties here without any
+          // completion check. pullIfNeeded() is the only path that runs 
completeStage() once upstream has
+          // finished, so it must run after draining or the stage hangs 
forever (mirrors the unordered branch).
+          pullIfNeeded()
         }
 
       private def pushNextIfPossibleUnordered(): Unit =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to