This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new ba05791fa0 chore: Handle NormalShutdownReason in MergeHub (#1741)
ba05791fa0 is described below

commit ba05791fa0ea347a5c6ac417bdd6c080b305e363
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Mar 3 19:52:32 2025 +0800

    chore: Handle NormalShutdownReason in MergeHub (#1741)
---
 .../test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala | 11 +++++++++++
 .../src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala |  4 +++-
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
index ce6c4495b2..0a334c3729 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
@@ -21,6 +21,7 @@ import org.apache.pekko
 import pekko.Done
 import pekko.stream.KillSwitches
 import pekko.stream.ThrottleMode
+import pekko.stream.impl.ActorPublisher
 import pekko.stream.testkit.StreamSpec
 import pekko.stream.testkit.TestPublisher
 import pekko.stream.testkit.TestSubscriber
@@ -51,6 +52,16 @@ class HubSpec extends StreamSpec {
       upstream.expectCancellation()
     }
 
+    "do not throw exceptions when upstream completes normally" in {
+      EventFilter.error("Upstream producer failed with exception, removing 
from MergeHub now",
+        occurrences = 0).intercept {
+        val (sink, result) = 
MergeHub.source[Int](16).take(10).toMat(Sink.seq)(Keep.both).run()
+        Source.failed(ActorPublisher.NormalShutdownReason).runWith(sink)
+        Source(1 to 10).runWith(sink)
+        result.futureValue.sorted should ===(1 to 10)
+      }
+    }
+
     "notify existing producers if consumer cancels after a few elements" in {
       val (sink, result) = 
MergeHub.source[Int](16).take(5).toMat(Sink.seq)(Keep.both).run()
       val upstream = TestPublisher.probe[Int]()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
index 94c44005c1..23be24d7f4 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
@@ -33,6 +33,7 @@ import pekko.annotation.InternalApi
 import pekko.dispatch.AbstractNodeQueue
 import pekko.stream._
 import pekko.stream.Attributes.LogLevels
+import pekko.stream.impl.ActorPublisher
 import pekko.stream.stage._
 
 /**
@@ -356,7 +357,8 @@ private[pekko] class MergeHub[T](perProducerBufferSize: 
Int, drainingEnabled: Bo
 
           // Make some noise
           override def onUpstreamFailure(ex: Throwable): Unit = {
-            throw new MergeHub.ProducerFailed(
+            if (ex eq ActorPublisher.NormalShutdownReason) completeStage()
+            else throw new MergeHub.ProducerFailed(
               "Upstream producer failed with exception, " +
               "removing from MergeHub now",
               ex)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to