This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch 
fix/streamref-graceful-cancel-before-subscription
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 7fb1fb960a49e7912142acb35a3083aed74aa81c
Author: He-Pin <[email protected]>
AuthorDate: Sat May 30 00:22:57 2026 +0800

    fix: await in-flight completion on partner termination in SinkRef
    
    Motivation:
    StreamRefsSpec "pass cancellation upstream across remoting before elements
    has been emitted" is flaky (issue #610, upstream akka#30844). When the
    consuming SourceRef cancels gracefully before any element, it sends a
    RemoteStreamCompleted to the origin SinkRef and then terminates. On
    transports such as Artery, system messages (the consumer's Terminated /
    deathwatch) travel on a dedicated lane and can overtake the ordinary-lane
    RemoteStreamCompleted. The SinkRef then observed Terminated first and, with
    no completion recorded yet, immediately failed with
    RemoteStreamRefActorTerminatedException ("message loss may have happened"),
    so the origin's watchTermination yielded Failure instead of Done.
    
    The SinkRef already guards its *own* termination against this lane race
    (finishedWithAwaitingPartnerTermination + setKeepGoing), and the SourceRef
    already waits a finalTerminationSignalDeadline grace period when its partner
    terminates - but the symmetric case on the SinkRef side was missing.
    
    Modification:
    When the SinkRef observes its partner Terminated with no recorded
    completion/failure, schedule a finalTerminationSignalDeadline grace timer
    instead of failing immediately. If the in-flight RemoteStreamCompleted /
    RemoteStreamFailure arrives within the deadline it completes/fails cleanly
    via the existing handlers; otherwise the timer fires and message loss is
    declared as before. Remove the corresponding FIXME from the test.
    
    Result:
    The graceful-cancel-before-subscription case completes cleanly across
    remoting. Full StreamRefsSpec passes (24 tests), and the cancellation cases
    pass 5/5 on repeat. MiMa (stream) reports no binary issues; the change is
    confined to internal SinkRefStageImpl logic.
    
    References:
    Fixes #610
---
 .../pekko/stream/scaladsl/StreamRefsSpec.scala     |  1 -
 .../pekko/stream/impl/streamref/SinkRefImpl.scala  | 38 +++++++++++++++++++---
 2 files changed, 34 insertions(+), 5 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StreamRefsSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StreamRefsSpec.scala
index 43cf4c5988..70a006a749 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StreamRefsSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StreamRefsSpec.scala
@@ -358,7 +358,6 @@ class StreamRefsSpec extends 
PekkoSpec(StreamRefsSpec.config()) {
       remoteProbe.expectMsg(Done)
     }
 
-    // FIXME https://github.com/akka/akka/issues/30844
     "pass cancellation upstream across remoting before elements has been 
emitted" in {
       val remoteProbe = TestProbe()(remoteSystem)
       remoteActor.tell("give-nothing-watch", remoteProbe.ref)
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/streamref/SinkRefImpl.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/streamref/SinkRefImpl.scala
index 4da744409d..e0430f0a0c 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/streamref/SinkRefImpl.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/streamref/SinkRefImpl.scala
@@ -81,6 +81,16 @@ private[stream] final class SinkRefStageImpl[In] 
private[pekko] (val initialPart
         inheritedAttributes.get[StreamRefAttributes.SubscriptionTimeout](
           SubscriptionTimeout(settings.subscriptionTimeout))
       }
+
+      @nowarn("msg=deprecated") // can't remove this settings access without 
breaking compat
+      private[this] val finalTerminationSignalDeadline = {
+        import StreamRefAttributes._
+        val settings = eagerMaterializer.settings.streamRefSettings
+        inheritedAttributes
+          .get[StreamRefAttributes.FinalTerminationSignalDeadline](
+            
FinalTerminationSignalDeadline(settings.finalTerminationSignalDeadline))
+          .timeout
+      }
       // end of settings ---
 
       override protected val stageActorName: String = 
streamRefsMaster.nextSinkRefStageName()
@@ -97,6 +107,7 @@ private[stream] final class SinkRefStageImpl[In] 
private[pekko] (val initialPart
         }
 
       val SubscriptionTimeoutTimerKey = "SubscriptionTimeoutKey"
+      val TerminationDeadlineTimerKey = "TerminationDeadlineKey"
 
       // demand management ---
       private var remoteCumulativeDemandReceived: Long = 0L
@@ -152,10 +163,18 @@ private[stream] final class SinkRefStageImpl[In] 
private[pekko] (val initialPart
               case OptionVal.Some(_ /* known to be Success*/ ) =>
                 completeStage() // other side has terminated (in response to a 
completion message) so we can safely terminate
               case _ =>
-                failStage(
-                  RemoteStreamRefActorTerminatedException(
-                    s"Remote target receiver of data $partnerRef terminated. " 
+
-                    s"Local stream terminating, message loss (on remote side) 
may have happened."))
+                // The partner may have cancelled or completed gracefully and 
a RemoteStreamCompleted
+                // or RemoteStreamFailure can still be in flight on the 
ordinary message lane while the
+                // Terminated (system message lane) overtakes it on transports 
such as Artery. Wait for
+                // that final signal up to finalTerminationSignalDeadline 
before assuming message loss,
+                // mirroring the SourceRef side. See 
https://github.com/akka/akka/issues/30844
+                log.debug(
+                  "[{}] Partner [{}] terminated before final signal, waiting 
[{}] for an in-flight completion",
+                  stageActorName,
+                  partnerRef,
+                  PrettyDuration.format(finalTerminationSignalDeadline))
+                scheduleOnce(TerminationDeadlineTimerKey, 
finalTerminationSignalDeadline)
+                setKeepGoing(true)
             }
 
         case (sender, StreamRefsProtocol.CumulativeDemand(d)) =>
@@ -219,6 +238,17 @@ private[stream] final class SinkRefStageImpl[In] 
private[pekko] (val initialPart
 
           throw ex
 
+        case TerminationDeadlineTimerKey =>
+          log.debug(
+            "[{}] No final completion signal arrived from terminated partner 
[{}] within deadline, " +
+            "assuming message loss",
+            stageActorName,
+            partnerRef)
+          failStage(
+            RemoteStreamRefActorTerminatedException(
+              s"Remote target receiver of data $partnerRef terminated. " +
+              s"Local stream terminating, message loss (on remote side) may 
have happened."))
+
         case other => throw new IllegalArgumentException(s"Unknown timer key: 
$other")
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to