This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 59a2cc81af fix: await in-flight completion on partner termination in
SinkRef (#3016)
59a2cc81af is described below
commit 59a2cc81af8eba81a548836afdad1f63ac1d95b8
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun May 31 21:41:18 2026 +0800
fix: await in-flight completion on partner termination in SinkRef (#3016)
Motivation:
StreamRefsSpec "pass cancellation upstream across remoting before elements
has been emitted" is flaky (issue #610, upstream akka#30844). When the
consuming SourceRef cancels gracefully before any element, it sends a
RemoteStreamCompleted to the origin SinkRef and then terminates. On
transports such as Artery, system messages (the consumer's Terminated /
deathwatch) travel on a dedicated lane and can overtake the ordinary-lane
RemoteStreamCompleted. The SinkRef then observed Terminated first and, with
no completion recorded yet, immediately failed with
RemoteStreamRefActorTerminatedException ("message loss may have happened"),
so the origin's watchTermination yielded Failure instead of Done.
The SinkRef already guards its *own* termination against this lane race
(finishedWithAwaitingPartnerTermination + setKeepGoing), and the SourceRef
already waits a finalTerminationSignalDeadline grace period when its partner
terminates - but the symmetric case on the SinkRef side was missing.
Modification:
When the SinkRef observes its partner Terminated with no recorded
completion/failure, schedule a finalTerminationSignalDeadline grace timer
instead of failing immediately. If the in-flight RemoteStreamCompleted /
RemoteStreamFailure arrives within the deadline it completes/fails cleanly
via the existing handlers; otherwise the timer fires and message loss is
declared as before. Remove the corresponding FIXME from the test.
Result:
The graceful-cancel-before-subscription case completes cleanly across
remoting. Full StreamRefsSpec passes (24 tests), and the cancellation cases
pass 5/5 on repeat. MiMa (stream) reports no binary issues; the change is
confined to internal SinkRefStageImpl logic.
References:
Fixes #610
---
.../pekko/stream/scaladsl/StreamRefsSpec.scala | 1 -
.../pekko/stream/impl/streamref/SinkRefImpl.scala | 38 +++++++++++++++++++---
2 files changed, 34 insertions(+), 5 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StreamRefsSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StreamRefsSpec.scala
index 43cf4c5988..70a006a749 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StreamRefsSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StreamRefsSpec.scala
@@ -358,7 +358,6 @@ class StreamRefsSpec extends
PekkoSpec(StreamRefsSpec.config()) {
remoteProbe.expectMsg(Done)
}
- // FIXME https://github.com/akka/akka/issues/30844
"pass cancellation upstream across remoting before elements has been
emitted" in {
val remoteProbe = TestProbe()(remoteSystem)
remoteActor.tell("give-nothing-watch", remoteProbe.ref)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/streamref/SinkRefImpl.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/streamref/SinkRefImpl.scala
index 4da744409d..e0430f0a0c 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/streamref/SinkRefImpl.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/streamref/SinkRefImpl.scala
@@ -81,6 +81,16 @@ private[stream] final class SinkRefStageImpl[In]
private[pekko] (val initialPart
inheritedAttributes.get[StreamRefAttributes.SubscriptionTimeout](
SubscriptionTimeout(settings.subscriptionTimeout))
}
+
+ @nowarn("msg=deprecated") // can't remove this settings access without
breaking compat
+ private[this] val finalTerminationSignalDeadline = {
+ import StreamRefAttributes._
+ val settings = eagerMaterializer.settings.streamRefSettings
+ inheritedAttributes
+ .get[StreamRefAttributes.FinalTerminationSignalDeadline](
+
FinalTerminationSignalDeadline(settings.finalTerminationSignalDeadline))
+ .timeout
+ }
// end of settings ---
override protected val stageActorName: String =
streamRefsMaster.nextSinkRefStageName()
@@ -97,6 +107,7 @@ private[stream] final class SinkRefStageImpl[In]
private[pekko] (val initialPart
}
val SubscriptionTimeoutTimerKey = "SubscriptionTimeoutKey"
+ val TerminationDeadlineTimerKey = "TerminationDeadlineKey"
// demand management ---
private var remoteCumulativeDemandReceived: Long = 0L
@@ -152,10 +163,18 @@ private[stream] final class SinkRefStageImpl[In]
private[pekko] (val initialPart
case OptionVal.Some(_ /* known to be Success*/ ) =>
completeStage() // other side has terminated (in response to a
completion message) so we can safely terminate
case _ =>
- failStage(
- RemoteStreamRefActorTerminatedException(
- s"Remote target receiver of data $partnerRef terminated. "
+
- s"Local stream terminating, message loss (on remote side)
may have happened."))
+ // The partner may have cancelled or completed gracefully and
a RemoteStreamCompleted
+ // or RemoteStreamFailure can still be in flight on the
ordinary message lane while the
+ // Terminated (system message lane) overtakes it on transports
such as Artery. Wait for
+ // that final signal up to finalTerminationSignalDeadline
before assuming message loss,
+ // mirroring the SourceRef side. See
https://github.com/akka/akka/issues/30844
+ log.debug(
+ "[{}] Partner [{}] terminated before final signal, waiting
[{}] for an in-flight completion",
+ stageActorName,
+ partnerRef,
+ PrettyDuration.format(finalTerminationSignalDeadline))
+ scheduleOnce(TerminationDeadlineTimerKey,
finalTerminationSignalDeadline)
+ setKeepGoing(true)
}
case (sender, StreamRefsProtocol.CumulativeDemand(d)) =>
@@ -219,6 +238,17 @@ private[stream] final class SinkRefStageImpl[In]
private[pekko] (val initialPart
throw ex
+ case TerminationDeadlineTimerKey =>
+ log.debug(
+ "[{}] No final completion signal arrived from terminated partner
[{}] within deadline, " +
+ "assuming message loss",
+ stageActorName,
+ partnerRef)
+ failStage(
+ RemoteStreamRefActorTerminatedException(
+ s"Remote target receiver of data $partnerRef terminated. " +
+ s"Local stream terminating, message loss (on remote side) may
have happened."))
+
case other => throw new IllegalArgumentException(s"Unknown timer key:
$other")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]