This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.2.x by this push:
new 7c5c138863 chore: disable batch if isVirtualized (#2046) (#2059)
7c5c138863 is described below
commit 7c5c138863e836f5ada2bef68b2b62e40a1fbe46
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Aug 23 18:16:52 2025 +0800
chore: disable batch if isVirtualized (#2046) (#2059)
(cherry picked from commit b3d2b3fbf1756b9dc342ec8fe9c5f7d1477eed58)
---
.../apache/pekko/dispatch/AbstractDispatcher.scala | 1 +
.../org/apache/pekko/dispatch/Dispatcher.scala | 8 +++++++-
.../dispatch/ForkJoinExecutorConfigurator.scala | 24 ++++++++--------------
.../apache/pekko/dispatch/ThreadPoolBuilder.scala | 1 +
4 files changed, 18 insertions(+), 16 deletions(-)
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
index dfacb8908b..15a2d61a3e 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
@@ -411,6 +411,7 @@ abstract class MessageDispatcherConfigurator(_config:
Config, val prerequisites:
final class VirtualThreadExecutorConfigurator(config: Config, prerequisites:
DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {
+ override def isVirtualized: Boolean = true
override def createExecutorServiceFactory(id: String, threadFactory:
ThreadFactory): ExecutorServiceFactory = {
import VirtualThreadSupport._
diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala
index 01fe66f0a2..53e09b96f0 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala
@@ -64,6 +64,7 @@ class Dispatcher(
new
LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id,
threadFactory))
protected final def executorService: ExecutorServiceDelegate =
executorServiceDelegate
+ private val isVirtualized = executorServiceFactoryProvider.isVirtualized
/**
* INTERNAL API
@@ -71,7 +72,12 @@ class Dispatcher(
protected[pekko] def dispatch(receiver: ActorCell, invocation: Envelope):
Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
- registerForExecution(mbox, true, false)
+ registerForExecution(mbox, hasMessageHint = true, hasSystemMessageHint =
false)
+ }
+
+ final override def batchable(runnable: Runnable): Boolean = {
+ // If this is a virtualized, we don't batch, otherwise, too much
threadLocals.
+ if (isVirtualized) false else super.batchable(runnable)
}
/**
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
index e56b67bc4b..c7d47ced32 100644
---
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
+++
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
@@ -78,6 +78,7 @@ object ForkJoinExecutorConfigurator {
class ForkJoinExecutorConfigurator(config: Config, prerequisites:
DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {
import ForkJoinExecutorConfigurator._
+ final override val isVirtualized: Boolean = config.getBoolean("virtualize")
&& JavaVersion.majorVersion >= 21
def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = t
match {
case correct: ForkJoinPool.ForkJoinWorkerThreadFactory => correct
@@ -91,24 +92,17 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int,
val asyncMode: Boolean,
- val maxPoolSize: Int,
- val virtualize: Boolean)
+ val maxPoolSize: Int)
extends ExecutorServiceFactory {
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
asyncMode: Boolean,
- maxPoolSize: Int,
- virtualize: Boolean) =
- this(null, threadFactory, parallelism, asyncMode, maxPoolSize,
virtualize)
+ maxPoolSize: Int) =
+ this(null, threadFactory, parallelism, asyncMode, maxPoolSize)
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
- asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode,
ForkJoinPoolConstants.MaxCap, false)
-
- def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
- parallelism: Int,
- asyncMode: Boolean,
- maxPoolSize: Int) = this(threadFactory, parallelism, asyncMode,
maxPoolSize, false)
+ asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode,
ForkJoinPoolConstants.MaxCap)
private def pekkoJdk9ForkJoinPoolClassOpt: Option[Class[_]] =
Try(Class.forName("org.apache.pekko.dispatch.PekkoJdk9ForkJoinPool")).toOption
@@ -131,7 +125,7 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
this(threadFactory, parallelism, asyncMode = true)
def createExecutorService: ExecutorService = {
- val tf = if (virtualize && JavaVersion.majorVersion >= 21) {
+ val tf = if (isVirtualized) {
threadFactory match {
// we need to use the thread factory to create carrier thread
case m: MonitorableThreadFactory => new
MonitorableCarrierThreadFactory(m.name)
@@ -148,7 +142,7 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
new PekkoForkJoinPool(parallelism, tf,
MonitorableThreadFactory.doNothing, asyncMode)
}
- if (virtualize && JavaVersion.majorVersion >= 21) {
+ if (isVirtualized) {
// when virtualized, we need enhanced thread factory
val factory: ThreadFactory = threadFactory match {
case MonitorableThreadFactory(name, _, contextClassLoader,
exceptionHandler, _) =>
@@ -202,7 +196,7 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")),
asyncMode,
- config.getInt("maximum-pool-size"),
- config.getBoolean("virtualize"))
+ config.getInt("maximum-pool-size")
+ )
}
}
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
index cf5a856ac2..b10cc46724 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
@@ -72,6 +72,7 @@ trait ExecutorServiceFactory {
*/
trait ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory):
ExecutorServiceFactory
+ def isVirtualized: Boolean = false // can be overridden by implementations
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]