This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f8be563b9 feat: Make SingleConsumerMultiProducer the default mailbox 
for stream.
3f8be563b9 is described below

commit 3f8be563b9aa614ed67966b679054621c8ae9ef0
Author: He-Pin <[email protected]>
AuthorDate: Mon Jan 8 19:24:33 2024 +0800

    feat: Make SingleConsumerMultiProducer the default mailbox for stream.
---
 stream/src/main/resources/reference.conf                   |  8 ++++++++
 .../apache/pekko/stream/impl/ActorMaterializerImpl.scala   |  6 +++++-
 .../pekko/stream/impl/PhasedFusingActorMaterializer.scala  | 14 ++++++++++++--
 3 files changed, 25 insertions(+), 3 deletions(-)

diff --git a/stream/src/main/resources/reference.conf 
b/stream/src/main/resources/reference.conf
index 2f376ec5ae..17be879104 100644
--- a/stream/src/main/resources/reference.conf
+++ b/stream/src/main/resources/reference.conf
@@ -21,6 +21,14 @@ pekko {
       # or full dispatcher configuration to be used by ActorMaterializer when 
creating Actors.
       dispatcher = "pekko.actor.default-dispatcher"
 
+      # FQCN of the MailboxType. The Class of the FQCN must have a public
+      # constructor with
+      # (org.apache.pekko.actor.ActorSystem.Settings, 
com.typesafe.config.Config) parameters.
+      # defaults to the single consumer mailbox for better performance.
+      mailbox {
+        mailbox-type = 
"org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
+      }
+
       # Fully qualified config path which holds the dispatcher configuration
       # or full dispatcher configuration to be used by stream operators that
       # perform blocking operations
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala
index 06391f899f..aaa4d83f17 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala
@@ -63,6 +63,7 @@ import pekko.util.OptionVal
       case Dispatchers.DefaultDispatcherId =>
         // the caller said to use the default dispatcher, but that can been 
trumped by the dispatcher attribute
         
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
+          .withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
       case _ => props
     }
 
@@ -195,10 +196,13 @@ private[pekko] class SubFusingActorMaterializerImpl(
  * INTERNAL API
  */
 @InternalApi private[pekko] object StreamSupervisor {
-  def props(attributes: Attributes, haveShutDown: AtomicBoolean): Props =
+  def props(attributes: Attributes, haveShutDown: AtomicBoolean): Props = {
     Props(new StreamSupervisor(haveShutDown))
       .withDeploy(Deploy.local)
       
.withDispatcher(attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
+      .withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
+  }
+
   private[stream] val baseName = "StreamSupervisor"
   private val actorName = SeqActorName(baseName)
   def nextName(): String = actorName.next()
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
index 0e58fef976..0bd9f06705 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
@@ -62,6 +62,8 @@ import pekko.util.OptionVal
 
   val Debug = false
 
+  val MailboxConfigName: String = "pekko.stream.materializer.mailbox"
+
   val DefaultPhase: Phase[Any] = new Phase[Any] {
     override def apply(
         settings: ActorMaterializerSettings,
@@ -116,7 +118,10 @@ import pekko.util.OptionVal
 
     val dispatcher = 
attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher
     val supervisorProps =
-      StreamSupervisor.props(attributes, 
haveShutDown).withDispatcher(dispatcher).withDeploy(Deploy.local)
+      StreamSupervisor.props(attributes, haveShutDown)
+        .withDispatcher(dispatcher)
+        .withMailbox(MailboxConfigName)
+        .withDeploy(Deploy.local)
 
     // FIXME why do we need a global unique name for the child?
     val streamSupervisor = context.actorOf(supervisorProps, 
StreamSupervisor.nextName())
@@ -625,6 +630,7 @@ private final case class SavedIslandData(
     val effectiveProps = props.dispatcher match {
       case Dispatchers.DefaultDispatcherId =>
         
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
+          .withMailbox(MailboxConfigName)
       case _ => props
     }
 
@@ -819,6 +825,7 @@ private final case class SavedIslandData(
         val props = ActorGraphInterpreter
           .props(shell)
           
.withDispatcher(effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
+          .withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
 
         val actorName = fullIslandName match {
           case OptionVal.Some(n) => n
@@ -974,7 +981,10 @@ private final case class SavedIslandData(
     val maxInputBuffer = 
attributes.mandatoryAttribute[Attributes.InputBuffer].max
 
     val props =
-      TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession, 
tls.closing).withDispatcher(dispatcher)
+      TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession, 
tls.closing)
+        .withDispatcher(dispatcher)
+        .withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
+
     tlsActor = materializer.actorOf(props, "TLS-for-" + islandName)
     def factory(id: Int) = new ActorPublisher[Any](tlsActor) {
       override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to