This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new ba05791fa0 chore: Handle NormalShutdownReason in MergeHub (#1741)
ba05791fa0 is described below
commit ba05791fa0ea347a5c6ac417bdd6c080b305e363
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Mar 3 19:52:32 2025 +0800
chore: Handle NormalShutdownReason in MergeHub (#1741)
---
.../test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala | 11 +++++++++++
.../src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala | 4 +++-
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
index ce6c4495b2..0a334c3729 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
@@ -21,6 +21,7 @@ import org.apache.pekko
import pekko.Done
import pekko.stream.KillSwitches
import pekko.stream.ThrottleMode
+import pekko.stream.impl.ActorPublisher
import pekko.stream.testkit.StreamSpec
import pekko.stream.testkit.TestPublisher
import pekko.stream.testkit.TestSubscriber
@@ -51,6 +52,16 @@ class HubSpec extends StreamSpec {
upstream.expectCancellation()
}
+ "do not throw exceptions when upstream completes normally" in {
+ EventFilter.error("Upstream producer failed with exception, removing
from MergeHub now",
+ occurrences = 0).intercept {
+ val (sink, result) =
MergeHub.source[Int](16).take(10).toMat(Sink.seq)(Keep.both).run()
+ Source.failed(ActorPublisher.NormalShutdownReason).runWith(sink)
+ Source(1 to 10).runWith(sink)
+ result.futureValue.sorted should ===(1 to 10)
+ }
+ }
+
"notify existing producers if consumer cancels after a few elements" in {
val (sink, result) =
MergeHub.source[Int](16).take(5).toMat(Sink.seq)(Keep.both).run()
val upstream = TestPublisher.probe[Int]()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
index 94c44005c1..23be24d7f4 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
@@ -33,6 +33,7 @@ import pekko.annotation.InternalApi
import pekko.dispatch.AbstractNodeQueue
import pekko.stream._
import pekko.stream.Attributes.LogLevels
+import pekko.stream.impl.ActorPublisher
import pekko.stream.stage._
/**
@@ -356,7 +357,8 @@ private[pekko] class MergeHub[T](perProducerBufferSize:
Int, drainingEnabled: Bo
// Make some noise
override def onUpstreamFailure(ex: Throwable): Unit = {
- throw new MergeHub.ProducerFailed(
+ if (ex eq ActorPublisher.NormalShutdownReason) completeStage()
+ else throw new MergeHub.ProducerFailed(
"Upstream producer failed with exception, " +
"removing from MergeHub now",
ex)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]