This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new b3d2b3fbf1 chore: disable batch if isVirtualized (#2046)
b3d2b3fbf1 is described below
commit b3d2b3fbf1756b9dc342ec8fe9c5f7d1477eed58
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Aug 23 16:59:39 2025 +0800
chore: disable batch if isVirtualized (#2046)
---
.../apache/pekko/dispatch/AbstractDispatcher.scala | 1 +
.../org/apache/pekko/dispatch/Dispatcher.scala | 8 +++++++-
.../dispatch/ForkJoinExecutorConfigurator.scala | 24 ++++++++--------------
.../apache/pekko/dispatch/ThreadPoolBuilder.scala | 1 +
4 files changed, 18 insertions(+), 16 deletions(-)
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
index 5a0e98f1d1..274c58c53f 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
@@ -411,6 +411,7 @@ abstract class MessageDispatcherConfigurator(_config:
Config, val prerequisites:
final class VirtualThreadExecutorConfigurator(config: Config, prerequisites:
DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {
+ override def isVirtualized: Boolean = true
override def createExecutorServiceFactory(id: String, threadFactory:
ThreadFactory): ExecutorServiceFactory = {
import VirtualThreadSupport._
diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala
index 01fe66f0a2..53e09b96f0 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala
@@ -64,6 +64,7 @@ class Dispatcher(
new
LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id,
threadFactory))
protected final def executorService: ExecutorServiceDelegate =
executorServiceDelegate
+ private val isVirtualized = executorServiceFactoryProvider.isVirtualized
/**
* INTERNAL API
@@ -71,7 +72,12 @@ class Dispatcher(
protected[pekko] def dispatch(receiver: ActorCell, invocation: Envelope):
Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
- registerForExecution(mbox, true, false)
+ registerForExecution(mbox, hasMessageHint = true, hasSystemMessageHint =
false)
+ }
+
+ final override def batchable(runnable: Runnable): Boolean = {
+ // If this is a virtualized, we don't batch, otherwise, too much
threadLocals.
+ if (isVirtualized) false else super.batchable(runnable)
}
/**
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
index 36d9820ae4..66d1e36918 100644
---
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
+++
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
@@ -73,6 +73,7 @@ object ForkJoinExecutorConfigurator {
class ForkJoinExecutorConfigurator(config: Config, prerequisites:
DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {
import ForkJoinExecutorConfigurator._
+ final override val isVirtualized: Boolean = config.getBoolean("virtualize")
&& JavaVersion.majorVersion >= 21
def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = t
match {
case correct: ForkJoinPool.ForkJoinWorkerThreadFactory => correct
@@ -86,30 +87,23 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int,
val asyncMode: Boolean,
- val maxPoolSize: Int,
- val virtualize: Boolean)
+ val maxPoolSize: Int)
extends ExecutorServiceFactory {
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
asyncMode: Boolean,
- maxPoolSize: Int,
- virtualize: Boolean) =
- this(null, threadFactory, parallelism, asyncMode, maxPoolSize,
virtualize)
+ maxPoolSize: Int) =
+ this(null, threadFactory, parallelism, asyncMode, maxPoolSize)
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
- asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode,
ForkJoinPoolConstants.MaxCap, false)
-
- def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
- parallelism: Int,
- asyncMode: Boolean,
- maxPoolSize: Int) = this(threadFactory, parallelism, asyncMode,
maxPoolSize, false)
+ asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode,
ForkJoinPoolConstants.MaxCap)
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int) =
this(threadFactory, parallelism, asyncMode = true)
def createExecutorService: ExecutorService = {
- val tf = if (virtualize && JavaVersion.majorVersion >= 21) {
+ val tf = if (isVirtualized) {
threadFactory match {
// we need to use the thread factory to create carrier thread
case m: MonitorableThreadFactory => new
MonitorableCarrierThreadFactory(m.name)
@@ -119,7 +113,7 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize,
MonitorableThreadFactory.doNothing, asyncMode)
- if (virtualize && JavaVersion.majorVersion >= 21) {
+ if (isVirtualized) {
// when virtualized, we need enhanced thread factory
val factory: ThreadFactory = threadFactory match {
case MonitorableThreadFactory(name, _, contextClassLoader,
exceptionHandler, _) =>
@@ -173,7 +167,7 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")),
asyncMode,
- config.getInt("maximum-pool-size"),
- config.getBoolean("virtualize"))
+ config.getInt("maximum-pool-size")
+ )
}
}
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
index cf5a856ac2..b10cc46724 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
@@ -72,6 +72,7 @@ trait ExecutorServiceFactory {
*/
trait ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory):
ExecutorServiceFactory
+ def isVirtualized: Boolean = false // can be overridden by implementations
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]