This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 3f8be563b9 feat: Make SingleConsumerMultiProducer the default mailbox
for stream.
3f8be563b9 is described below
commit 3f8be563b9aa614ed67966b679054621c8ae9ef0
Author: He-Pin <[email protected]>
AuthorDate: Mon Jan 8 19:24:33 2024 +0800
feat: Make SingleConsumerMultiProducer the default mailbox for stream.
---
stream/src/main/resources/reference.conf | 8 ++++++++
.../apache/pekko/stream/impl/ActorMaterializerImpl.scala | 6 +++++-
.../pekko/stream/impl/PhasedFusingActorMaterializer.scala | 14 ++++++++++++--
3 files changed, 25 insertions(+), 3 deletions(-)
diff --git a/stream/src/main/resources/reference.conf
b/stream/src/main/resources/reference.conf
index 2f376ec5ae..17be879104 100644
--- a/stream/src/main/resources/reference.conf
+++ b/stream/src/main/resources/reference.conf
@@ -21,6 +21,14 @@ pekko {
# or full dispatcher configuration to be used by ActorMaterializer when
creating Actors.
dispatcher = "pekko.actor.default-dispatcher"
+ # FQCN of the MailboxType. The Class of the FQCN must have a public
+ # constructor with
+ # (org.apache.pekko.actor.ActorSystem.Settings,
com.typesafe.config.Config) parameters.
+ # defaults to the single consumer mailbox for better performance.
+ mailbox {
+ mailbox-type =
"org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
+ }
+
# Fully qualified config path which holds the dispatcher configuration
# or full dispatcher configuration to be used by stream operators that
# perform blocking operations
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala
index 06391f899f..aaa4d83f17 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala
@@ -63,6 +63,7 @@ import pekko.util.OptionVal
case Dispatchers.DefaultDispatcherId =>
// the caller said to use the default dispatcher, but that can been
trumped by the dispatcher attribute
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
+ .withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
case _ => props
}
@@ -195,10 +196,13 @@ private[pekko] class SubFusingActorMaterializerImpl(
* INTERNAL API
*/
@InternalApi private[pekko] object StreamSupervisor {
- def props(attributes: Attributes, haveShutDown: AtomicBoolean): Props =
+ def props(attributes: Attributes, haveShutDown: AtomicBoolean): Props = {
Props(new StreamSupervisor(haveShutDown))
.withDeploy(Deploy.local)
.withDispatcher(attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
+ .withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
+ }
+
private[stream] val baseName = "StreamSupervisor"
private val actorName = SeqActorName(baseName)
def nextName(): String = actorName.next()
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
index 0e58fef976..0bd9f06705 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
@@ -62,6 +62,8 @@ import pekko.util.OptionVal
val Debug = false
+ val MailboxConfigName: String = "pekko.stream.materializer.mailbox"
+
val DefaultPhase: Phase[Any] = new Phase[Any] {
override def apply(
settings: ActorMaterializerSettings,
@@ -116,7 +118,10 @@ import pekko.util.OptionVal
val dispatcher =
attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher
val supervisorProps =
- StreamSupervisor.props(attributes,
haveShutDown).withDispatcher(dispatcher).withDeploy(Deploy.local)
+ StreamSupervisor.props(attributes, haveShutDown)
+ .withDispatcher(dispatcher)
+ .withMailbox(MailboxConfigName)
+ .withDeploy(Deploy.local)
// FIXME why do we need a global unique name for the child?
val streamSupervisor = context.actorOf(supervisorProps,
StreamSupervisor.nextName())
@@ -625,6 +630,7 @@ private final case class SavedIslandData(
val effectiveProps = props.dispatcher match {
case Dispatchers.DefaultDispatcherId =>
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
+ .withMailbox(MailboxConfigName)
case _ => props
}
@@ -819,6 +825,7 @@ private final case class SavedIslandData(
val props = ActorGraphInterpreter
.props(shell)
.withDispatcher(effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
+ .withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
val actorName = fullIslandName match {
case OptionVal.Some(n) => n
@@ -974,7 +981,10 @@ private final case class SavedIslandData(
val maxInputBuffer =
attributes.mandatoryAttribute[Attributes.InputBuffer].max
val props =
- TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession,
tls.closing).withDispatcher(dispatcher)
+ TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession,
tls.closing)
+ .withDispatcher(dispatcher)
+ .withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
+
tlsActor = materializer.actorOf(props, "TLS-for-" + islandName)
def factory(id: Int) = new ActorPublisher[Any](tlsActor) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]