This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 73668deae9 Improve subscriber timeout error: use 
SubscriptionTimeoutException instead of AbruptTerminationException (#2645) 
(#2727)
73668deae9 is described below

commit 73668deae923ba46ba1ce875f2a514a72b71cfb4
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Mar 15 22:41:24 2026 +0800

    Improve subscriber timeout error: use SubscriptionTimeoutException instead 
of AbruptTerminationException (#2645) (#2727)
    
    When subscriber-timeout fires on a FanoutProcessor with CancelTermination 
mode,
    the actor now calls fail() with a SubscriptionTimeoutException instead of 
directly
    calling context.stop(self). This provides a clear error message 
('Subscription
    timeout expired, no subscriber attached') rather than a generic
    'Processor actor terminated abruptly' from AbruptTerminationException.
    
    A warning log is also emitted when the timeout fires.
    
    References: https://github.com/apache/pekko/issues/2645
    
    Co-authored-by: Copilot <[email protected]>
---
 .../pekko/stream/impl/FanoutProcessorSpec.scala    | 31 ++++++++++++++++++++++
 .../apache/pekko/stream/impl/FanoutProcessor.scala |  9 +++++--
 2 files changed, 38 insertions(+), 2 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
index 92964cbb2d..6880fcc4e1 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/FanoutProcessorSpec.scala
@@ -13,7 +13,12 @@
 
 package org.apache.pekko.stream.impl
 
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import org.apache.pekko
+import pekko.stream.ActorAttributes
+import pekko.stream.StreamSubscriptionTimeoutTerminationMode
 import pekko.stream.scaladsl.Keep
 import pekko.stream.scaladsl.Sink
 import pekko.stream.scaladsl.Source
@@ -108,6 +113,32 @@ class FanoutProcessorSpec extends StreamSpec {
       probe.expectTerminated(publisherRef)
     }
 
+    // #2645
+    "fail with SubscriptionTimeoutException instead of 
AbruptTerminationException on subscriber timeout" in {
+      val shortTimeout = 300.millis
+      val timeoutAttributes = ActorAttributes.streamSubscriptionTimeout(
+        shortTimeout, 
StreamSubscriptionTimeoutTerminationMode.CancelTermination)
+
+      val (_, publisher) = Source.maybe[Int]
+        
.toMat(Sink.asPublisher(true).addAttributes(timeoutAttributes))(Keep.both)
+        .run()
+
+      // Do NOT subscribe — let the timeout fire
+      val probe = TestProbe()
+      val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl
+      probe.watch(publisherRef)
+
+      // The actor should terminate after the subscription timeout
+      probe.expectTerminated(publisherRef, shortTimeout + 3.seconds)
+
+      // Now try to subscribe after timeout and verify the error type
+      val result = Source.fromPublisher(publisher).runWith(Sink.head)
+      val ex = intercept[SubscriptionTimeoutException] {
+        Await.result(result, 3.seconds)
+      }
+      ex.getMessage should include("Subscription timeout expired")
+    }
+
   }
 
 }
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala
index d217f7b873..36c36c8ece 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutProcessor.scala
@@ -13,6 +13,8 @@
 
 package org.apache.pekko.stream.impl
 
+import scala.util.control.NoStackTrace
+
 import org.apache.pekko
 import pekko.actor.Actor
 import pekko.actor.ActorRef
@@ -172,8 +174,11 @@ import org.reactivestreams.Subscriber
       if (!primaryOutputs.subscribed) {
         timeoutMode match {
           case CancelTermination =>
-            primaryInputs.cancel()
-            context.stop(self)
+            // Use fail() to propagate a SubscriptionTimeoutException 
downstream instead of
+            // stopping abruptly, which would only produce a non-informative 
AbruptTerminationException
+            log.warning("Subscription timeout expired for [{}], no subscriber 
attached in time", self)
+            fail(new SubscriptionTimeoutException(
+              s"Subscription timeout expired, no subscriber attached to 
[$self]") with NoStackTrace)
           case WarnTermination =>
             log.warning("Subscription timeout for {}", this)
           case NoopTermination => // won't happen


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to