This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new b3d2b3fbf1 chore: disable batch if isVirtualized (#2046)
b3d2b3fbf1 is described below

commit b3d2b3fbf1756b9dc342ec8fe9c5f7d1477eed58
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Aug 23 16:59:39 2025 +0800

    chore: disable batch if isVirtualized (#2046)
---
 .../apache/pekko/dispatch/AbstractDispatcher.scala |  1 +
 .../org/apache/pekko/dispatch/Dispatcher.scala     |  8 +++++++-
 .../dispatch/ForkJoinExecutorConfigurator.scala    | 24 ++++++++--------------
 .../apache/pekko/dispatch/ThreadPoolBuilder.scala  |  1 +
 4 files changed, 18 insertions(+), 16 deletions(-)

diff --git 
a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala 
b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
index 5a0e98f1d1..274c58c53f 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
@@ -411,6 +411,7 @@ abstract class MessageDispatcherConfigurator(_config: 
Config, val prerequisites:
 
 final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: 
DispatcherPrerequisites)
     extends ExecutorServiceConfigurator(config, prerequisites) {
+  override def isVirtualized: Boolean = true
 
   override def createExecutorServiceFactory(id: String, threadFactory: 
ThreadFactory): ExecutorServiceFactory = {
     import VirtualThreadSupport._
diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala 
b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala
index 01fe66f0a2..53e09b96f0 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala
@@ -64,6 +64,7 @@ class Dispatcher(
     new 
LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id,
 threadFactory))
 
   protected final def executorService: ExecutorServiceDelegate = 
executorServiceDelegate
+  private val isVirtualized = executorServiceFactoryProvider.isVirtualized
 
   /**
    * INTERNAL API
@@ -71,7 +72,12 @@ class Dispatcher(
   protected[pekko] def dispatch(receiver: ActorCell, invocation: Envelope): 
Unit = {
     val mbox = receiver.mailbox
     mbox.enqueue(receiver.self, invocation)
-    registerForExecution(mbox, true, false)
+    registerForExecution(mbox, hasMessageHint = true, hasSystemMessageHint = 
false)
+  }
+
+  final override def batchable(runnable: Runnable): Boolean = {
+    // If this is a virtualized, we don't batch, otherwise, too much 
threadLocals.
+    if (isVirtualized) false else super.batchable(runnable)
   }
 
   /**
diff --git 
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
 
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
index 36d9820ae4..66d1e36918 100644
--- 
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
+++ 
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
@@ -73,6 +73,7 @@ object ForkJoinExecutorConfigurator {
 class ForkJoinExecutorConfigurator(config: Config, prerequisites: 
DispatcherPrerequisites)
     extends ExecutorServiceConfigurator(config, prerequisites) {
   import ForkJoinExecutorConfigurator._
+  final override val isVirtualized: Boolean = config.getBoolean("virtualize") 
&& JavaVersion.majorVersion >= 21
 
   def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = t 
match {
     case correct: ForkJoinPool.ForkJoinWorkerThreadFactory => correct
@@ -86,30 +87,23 @@ class ForkJoinExecutorConfigurator(config: Config, 
prerequisites: DispatcherPrer
       val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
       val parallelism: Int,
       val asyncMode: Boolean,
-      val maxPoolSize: Int,
-      val virtualize: Boolean)
+      val maxPoolSize: Int)
       extends ExecutorServiceFactory {
     def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
         parallelism: Int,
         asyncMode: Boolean,
-        maxPoolSize: Int,
-        virtualize: Boolean) =
-      this(null, threadFactory, parallelism, asyncMode, maxPoolSize, 
virtualize)
+        maxPoolSize: Int) =
+      this(null, threadFactory, parallelism, asyncMode, maxPoolSize)
 
     def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
         parallelism: Int,
-        asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, 
ForkJoinPoolConstants.MaxCap, false)
-
-    def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
-        parallelism: Int,
-        asyncMode: Boolean,
-        maxPoolSize: Int) = this(threadFactory, parallelism, asyncMode, 
maxPoolSize, false)
+        asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, 
ForkJoinPoolConstants.MaxCap)
 
     def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, 
parallelism: Int) =
       this(threadFactory, parallelism, asyncMode = true)
 
     def createExecutorService: ExecutorService = {
-      val tf = if (virtualize && JavaVersion.majorVersion >= 21) {
+      val tf = if (isVirtualized) {
         threadFactory match {
           // we need to use the thread factory to create carrier thread
           case m: MonitorableThreadFactory => new 
MonitorableCarrierThreadFactory(m.name)
@@ -119,7 +113,7 @@ class ForkJoinExecutorConfigurator(config: Config, 
prerequisites: DispatcherPrer
 
       val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, 
MonitorableThreadFactory.doNothing, asyncMode)
 
-      if (virtualize && JavaVersion.majorVersion >= 21) {
+      if (isVirtualized) {
         // when virtualized, we need enhanced thread factory
         val factory: ThreadFactory = threadFactory match {
           case MonitorableThreadFactory(name, _, contextClassLoader, 
exceptionHandler, _) =>
@@ -173,7 +167,7 @@ class ForkJoinExecutorConfigurator(config: Config, 
prerequisites: DispatcherPrer
         config.getDouble("parallelism-factor"),
         config.getInt("parallelism-max")),
       asyncMode,
-      config.getInt("maximum-pool-size"),
-      config.getBoolean("virtualize"))
+      config.getInt("maximum-pool-size")
+    )
   }
 }
diff --git 
a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala 
b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
index cf5a856ac2..b10cc46724 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
@@ -72,6 +72,7 @@ trait ExecutorServiceFactory {
  */
 trait ExecutorServiceFactoryProvider {
   def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): 
ExecutorServiceFactory
+  def isVirtualized: Boolean = false // can be overridden by implementations
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to