This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 73668deae9 Improve subscriber timeout error: use
SubscriptionTimeoutException instead of AbruptTerminationException (#2645)
(#2727)
73668deae9 is described below
commit 73668deae923ba46ba1ce875f2a514a72b71cfb4
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Mar 15 22:41:24 2026 +0800
Improve subscriber timeout error: use SubscriptionTimeoutException instead
of AbruptTerminationException (#2645) (#2727)
When subscriber-timeout fires on a FanoutProcessor with CancelTermination
mode,
the actor now calls fail() with a SubscriptionTimeoutException instead of
directly
calling context.stop(self). This provides a clear error message
('Subscription
timeout expired, no subscriber attached') rather than a generic
'Processor actor terminated abruptly' from AbruptTerminationException.
A warning log is also emitted when the timeout fires.
References: https://github.com/apache/pekko/issues/2645
Co-authored-by: Copilot <[email protected]>
---
.../pekko/stream/impl/FanoutProcessorSpec.scala | 31 ++++++++++++++++++++++
.../apache/pekko/stream/impl/FanoutProcessor.scala | 9 +++++--
2 files changed, 38 insertions(+), 2 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
index 92964cbb2d..6880fcc4e1 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
@@ -13,7 +13,12 @@
package org.apache.pekko.stream.impl
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
import org.apache.pekko
+import pekko.stream.ActorAttributes
+import pekko.stream.StreamSubscriptionTimeoutTerminationMode
import pekko.stream.scaladsl.Keep
import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source
@@ -108,6 +113,32 @@ class FanoutProcessorSpec extends StreamSpec {
probe.expectTerminated(publisherRef)
}
+ // #2645
+ "fail with SubscriptionTimeoutException instead of
AbruptTerminationException on subscriber timeout" in {
+ val shortTimeout = 300.millis
+ val timeoutAttributes = ActorAttributes.streamSubscriptionTimeout(
+ shortTimeout,
StreamSubscriptionTimeoutTerminationMode.CancelTermination)
+
+ val (_, publisher) = Source.maybe[Int]
+
.toMat(Sink.asPublisher(true).addAttributes(timeoutAttributes))(Keep.both)
+ .run()
+
+ // Do NOT subscribe — let the timeout fire
+ val probe = TestProbe()
+ val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
+ probe.watch(publisherRef)
+
+ // The actor should terminate after the subscription timeout
+ probe.expectTerminated(publisherRef, shortTimeout + 3.seconds)
+
+ // Now try to subscribe after timeout and verify the error type
+ val result = Source.fromPublisher(publisher).runWith(Sink.head)
+ val ex = intercept[SubscriptionTimeoutException] {
+ Await.result(result, 3.seconds)
+ }
+ ex.getMessage should include("Subscription timeout expired")
+ }
+
}
}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala
index d217f7b873..36c36c8ece 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala
@@ -13,6 +13,8 @@
package org.apache.pekko.stream.impl
+import scala.util.control.NoStackTrace
+
import org.apache.pekko
import pekko.actor.Actor
import pekko.actor.ActorRef
@@ -172,8 +174,11 @@ import org.reactivestreams.Subscriber
if (!primaryOutputs.subscribed) {
timeoutMode match {
case CancelTermination =>
- primaryInputs.cancel()
- context.stop(self)
+ // Use fail() to propagate a SubscriptionTimeoutException
downstream instead of
+ // stopping abruptly, which would only produce a non-informative
AbruptTerminationException
+ log.warning("Subscription timeout expired for [{}], no subscriber
attached in time", self)
+ fail(new SubscriptionTimeoutException(
+ s"Subscription timeout expired, no subscriber attached to
[$self]") with NoStackTrace)
case WarnTermination =>
log.warning("Subscription timeout for {}", this)
case NoopTermination => // won't happen
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]