This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new e3e01889c8 feat: Support setting starting number of virtual thread. 
(#2242)
e3e01889c8 is described below

commit e3e01889c8931982ed7b6e40adb9d1f42ab35508
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu Sep 25 23:59:00 2025 +0800

    feat: Support setting starting number of virtual thread. (#2242)
---
 .../dispatch/ForkJoinPoolVirtualThreadSpec.scala   | 21 ++++++++
 .../dispatch/ThreadPoolVirtualThreadSpec.scala     | 22 ++++++++-
 .../dispatch/VirtualThreadPoolDispatcherSpec.scala |  2 +-
 actor/src/main/resources/reference.conf            | 18 ++++++-
 .../apache/pekko/dispatch/AbstractDispatcher.scala | 16 +++----
 .../dispatch/ForkJoinExecutorConfigurator.scala    |  8 ++--
 .../apache/pekko/dispatch/ThreadPoolBuilder.scala  | 19 ++++++--
 .../pekko/dispatch/VirtualThreadSupport.scala      | 56 ++++++++++++----------
 8 files changed, 115 insertions(+), 47 deletions(-)

diff --git 
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
 
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
index 550f34ef90..dd97a74a0d 100644
--- 
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
+++ 
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
@@ -35,6 +35,19 @@ object ForkJoinPoolVirtualThreadSpec {
       |      parallelism-max = 2
       |      parallelism-min = 2
       |      virtualize = on
+      |      virtual-thread-start-number = 0
+      |    }
+      |  }
+      |  task-dispatcher-short {
+      |    mailbox-type = 
"org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
+      |    throughput = 5
+      |    executor = "fork-join-executor"
+      |    fork-join-executor {
+      |      parallelism-factor = 2
+      |      parallelism-max = 2
+      |      parallelism-min = 2
+      |      virtualize = on
+      |      virtual-thread-start-number = -1
       |    }
       |  }
       |}
@@ -63,6 +76,14 @@ class ForkJoinPoolVirtualThreadSpec extends 
PekkoSpec(ForkJoinPoolVirtualThreadS
           name should 
include("ForkJoinPoolVirtualThreadSpec-custom.task-dispatcher-virtual-thread-")
         }
       }
+
+      val actor2 = system.actorOf(Props(new 
ThreadNameActor).withDispatcher("custom.task-dispatcher-short"))
+      for (_ <- 1 to 1000) {
+        actor2 ! "ping"
+        expectMsgPF() { case name: String =>
+          name should 
include("ForkJoinPoolVirtualThreadSpec-custom.task-dispatcher-short-virtual-thread")
+        }
+      }
     }
 
   }
diff --git 
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala
 
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala
index 406730c233..157174ca88 100644
--- 
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala
+++ 
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala
@@ -31,8 +31,20 @@ object ThreadPoolVirtualThreadSpec {
       |    throughput = 5
       |    executor = "thread-pool-executor"
       |    thread-pool-executor {
-      |      fixed-pool-size = 4
+      |      fixed-pool-size = 1
       |      virtualize = on
+      |      virtual-thread-start-number = 0
+      |    }
+      |  }
+      |
+      |  task-dispatcher-short {
+      |    mailbox-type = 
"org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
+      |    throughput = 5
+      |    executor = "thread-pool-executor"
+      |    thread-pool-executor {
+      |      fixed-pool-size = 1
+      |      virtualize = on
+      |      virtual-thread-start-number = -1
       |    }
       |  }
       |}
@@ -61,6 +73,14 @@ class ThreadPoolVirtualThreadSpec extends 
PekkoSpec(ThreadPoolVirtualThreadSpec.
           name should 
include("ThreadPoolVirtualThreadSpec-custom.task-dispatcher-virtual-thread-")
         }
       }
+
+      val actor2 = system.actorOf(Props(new 
ThreadNameActor).withDispatcher("custom.task-dispatcher-short"))
+      for (_ <- 1 to 1000) {
+        actor2 ! "ping"
+        expectMsgPF() { case name: String =>
+          name should 
include("ThreadPoolVirtualThreadSpec-custom.task-dispatcher-short-virtual-thread")
+        }
+      }
     }
 
   }
diff --git 
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala
 
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala
index 2c8266c6fa..a28607485e 100644
--- 
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala
+++ 
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala
@@ -50,7 +50,7 @@ class VirtualThreadPoolDispatcherSpec extends 
PekkoSpec(VirtualThreadPoolDispatc
       for (_ <- 1 to 1000) {
         innocentActor ! "ping"
         expectMsgPF() { case name: String =>
-          name should 
include("VirtualThreadPoolDispatcherSpec-virtual-thread-dispatcher-virtual-thread-")
+          name should 
include("VirtualThreadPoolDispatcherSpec-virtual-thread-dispatcher-virtual-thread")
         }
       }
     }
diff --git a/actor/src/main/resources/reference.conf 
b/actor/src/main/resources/reference.conf
index 740c2975b4..9b12fb106b 100644
--- a/actor/src/main/resources/reference.conf
+++ b/actor/src/main/resources/reference.conf
@@ -492,6 +492,10 @@ pekko {
         #   --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
         #   --add-opens=java.base/java.lang=ALL-UNNAMED
         virtualize = off
+
+        # When virtualize = on, you can set the starting id for virtual 
threads created by this dispatcher.
+        # -1 (default) to turn off, 0 to start from 0, or any positive number 
to start from that number.
+        virtual-thread-start-number = -1
       }
 
       # This will be used if you have set "executor = "thread-pool-executor""
@@ -529,7 +533,7 @@ pekko {
         # ceil(available processors * factor)
         # The maximumPoolSize will not be less than corePoolSize.
         # It is only used if using a bounded task queue.
-        max-pool-size-factor  = 3.0
+        max-pool-size-factor = 3.0
 
         # Max number of threads to cap factor-based maximumPoolSize number to
         max-pool-size-max = 64
@@ -555,6 +559,10 @@ pekko {
         #   --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
         #   --add-opens=java.base/java.lang=ALL-UNNAMED
         virtualize = off
+
+        # When virtualize = on, you can set the starting number for virtual 
threads created by this dispatcher.
+        # -1 (default) to turn off, 0 to start from 0, or any positive number 
to start from that number.
+        virtual-thread-start-number = -1
       }
 
       # This will be used if you have set "executor = "virtual-thread-executor"
@@ -569,6 +577,10 @@ pekko {
         #jdk.virtualThreadScheduler.minRunnable
         #jdk.unparker.maxPoolSize
         fallback = "fork-join-executor"
+
+        # When using virtual-thread-executor, you can set the starting number 
for virtual threads created by this dispatcher.
+        # -1 (default) to turn off, 0 to start from 0, or any positive number 
to start from that number.
+        virtual-thread-start-number = -1
       }
       # How long time the dispatcher will wait for new actors until it shuts 
down
       shutdown-timeout = 1s
@@ -623,6 +635,10 @@ pekko {
         #   --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
         #   --add-opens=java.base/java.lang=ALL-UNNAMED
         virtualize = off
+
+        # When virtualize = on, you can set the starting number for virtual 
threads created by this dispatcher.
+        # -1 (default) to turn off, 0 to start from 0, or any positive number 
to start from that number.
+        virtual-thread-start-number = -1
       }
     }
 
diff --git 
a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala 
b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
index 4c4e74d448..142ac836fe 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
@@ -28,7 +28,7 @@ import pekko.dispatch.affinity.AffinityPoolConfigurator
 import pekko.dispatch.sysmsg._
 import pekko.event.EventStream
 import pekko.event.Logging.{ emptyMDC, Debug, Error, LogEventException, 
Warning }
-import pekko.util.{ Index, JavaVersion }
+import pekko.util.Index
 
 import com.typesafe.config.Config
 
@@ -412,14 +412,14 @@ abstract class MessageDispatcherConfigurator(_config: 
Config, val prerequisites:
 
 final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: 
DispatcherPrerequisites)
     extends ExecutorServiceConfigurator(config, prerequisites) {
-  override def isVirtualized: Boolean = true
-
+  override def isVirtualized: Boolean = VirtualThreadSupport.isSupported
+  override def virtualThreadStartNumber: Int = 
config.getInt("virtual-thread-start-number")
   override def createExecutorServiceFactory(id: String, threadFactory: 
ThreadFactory): ExecutorServiceFactory = {
     import VirtualThreadSupport._
     val tf: ThreadFactory = threadFactory match {
       case MonitorableThreadFactory(name, _, contextClassLoader, 
exceptionHandler, _) =>
         new ThreadFactory {
-          private val vtFactory = newVirtualThreadFactory(name + "-" + id)
+          private val vtFactory = newVirtualThreadFactory(name + "-" + id, 
virtualThreadStartNumber)
 
           override def newThread(r: Runnable): Thread = {
             val vt = vtFactory.newThread(r)
@@ -428,7 +428,7 @@ final class VirtualThreadExecutorConfigurator(config: 
Config, prerequisites: Dis
             vt
           }
         }
-      case _ => newVirtualThreadFactory(prerequisites.settings.name + "-" + 
id);
+      case _ => newVirtualThreadFactory(prerequisites.settings.name + "-" + 
id, virtualThreadStartNumber);
     }
     new ExecutorServiceFactory {
       override def createExecutorService: ExecutorService with LoadMetrics = {
@@ -502,7 +502,7 @@ trait ThreadPoolExecutorServiceFactoryProvider extends 
ExecutorServiceFactoryPro
             case m: MonitorableThreadFactory => m.name + "-" + id
             case _                           => id
           }
-          createVirtualized(tf, pool, prefixName)
+          createVirtualized(tf, pool, prefixName, virtualThreadStartNumber)
         } else pool
       }
     }
@@ -514,8 +514,8 @@ class ThreadPoolExecutorConfigurator(config: Config, 
prerequisites: DispatcherPr
     extends ExecutorServiceConfigurator(config, prerequisites)
     with ThreadPoolExecutorServiceFactoryProvider {
   override val threadPoolConfig: ThreadPoolConfig = 
createThreadPoolConfigBuilder(config, prerequisites).config
-  override val isVirtualized: Boolean = threadPoolConfig.isVirtualized && 
JavaVersion.majorVersion >= 21
-
+  override val isVirtualized: Boolean = threadPoolConfig.isVirtualized && 
VirtualThreadSupport.isSupported
+  override def virtualThreadStartNumber: Int = 
config.getInt("virtual-thread-start-number")
   protected def createThreadPoolConfigBuilder(
       config: Config,
       @nowarn("msg=never used") prerequisites: DispatcherPrerequisites): 
ThreadPoolConfigBuilder = {
diff --git 
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
 
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
index 7aa1aeabdd..e5d7e7f8ce 100644
--- 
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
+++ 
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
@@ -15,9 +15,6 @@ package org.apache.pekko.dispatch
 
 import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, 
ThreadFactory, TimeUnit }
 
-import org.apache.pekko
-import pekko.util.JavaVersion
-
 import com.typesafe.config.Config
 
 object ForkJoinExecutorConfigurator {
@@ -73,7 +70,8 @@ object ForkJoinExecutorConfigurator {
 class ForkJoinExecutorConfigurator(config: Config, prerequisites: 
DispatcherPrerequisites)
     extends ExecutorServiceConfigurator(config, prerequisites) {
   import ForkJoinExecutorConfigurator._
-  final override val isVirtualized: Boolean = config.getBoolean("virtualize") 
&& JavaVersion.majorVersion >= 21
+  final override val isVirtualized: Boolean = config.getBoolean("virtualize") 
&& VirtualThreadSupport.isSupported
+  override def virtualThreadStartNumber: Int = 
config.getInt("virtual-thread-start-number")
 
   def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = t 
match {
     case correct: ForkJoinPool.ForkJoinWorkerThreadFactory => correct
@@ -115,7 +113,7 @@ class ForkJoinExecutorConfigurator(config: Config, 
prerequisites: DispatcherPrer
 
       if (isVirtualized) {
         // we need to cast here,
-        createVirtualized(threadFactory.asInstanceOf[ThreadFactory], pool, id)
+        createVirtualized(threadFactory.asInstanceOf[ThreadFactory], pool, id, 
virtualThreadStartNumber)
       } else {
         pool
       }
diff --git 
a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala 
b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
index 093c5645f1..e1321c7cab 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
@@ -77,17 +77,26 @@ trait ExecutorServiceFactory {
  */
 trait ExecutorServiceFactoryProvider {
   def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): 
ExecutorServiceFactory
+
+  /**
+   * Whether the executor service created by this factory should use virtual 
threads.
+   */
   def isVirtualized: Boolean = false // can be overridden by implementations
 
-  protected def createVirtualized(
-      threadFactory: ThreadFactory,
+  /**
+   * The starting number of the virtual thread name, if -1, the number will 
not be appended.
+   */
+  def virtualThreadStartNumber: Int = 0 // can be overridden by implementations
+
+  protected def createVirtualized(threadFactory: ThreadFactory,
       pool: ExecutorService with LoadMetrics,
-      prefixName: String): ExecutorService = {
+      prefixName: String,
+      startNumber: Int): ExecutorService = {
     // when virtualized, we need enhanced thread factory
     val factory: ThreadFactory = threadFactory match {
       case MonitorableThreadFactory(name, _, contextClassLoader, 
exceptionHandler, _) =>
         new ThreadFactory {
-          private val vtFactory = newVirtualThreadFactory(name, pool) // use 
the pool as the scheduler
+          private val vtFactory = newVirtualThreadFactory(name, startNumber, 
pool) // use the pool as the scheduler
 
           override def newThread(r: Runnable): Thread = {
             val vt = vtFactory.newThread(r)
@@ -96,7 +105,7 @@ trait ExecutorServiceFactoryProvider {
             vt
           }
         }
-      case _ => newVirtualThreadFactory(prefixName, pool); // use the pool as 
the scheduler
+      case _ => newVirtualThreadFactory(prefixName, startNumber, pool); // use 
the pool as the scheduler
     }
     // wrap the pool with virtualized executor service
     new VirtualizedExecutorService(
diff --git 
a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala 
b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
index 6b3532ba99..257da4d97b 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
@@ -28,7 +28,7 @@ import pekko.util.JavaVersion
 
 @InternalApi
 private[dispatch] object VirtualThreadSupport {
-  val zero = java.lang.Long.valueOf(0L)
+  private val zero = java.lang.Long.valueOf(0L)
   private val lookup = MethodHandles.publicLookup()
 
   /**
@@ -57,53 +57,57 @@ private[dispatch] object VirtualThreadSupport {
 
   /**
    * Create a virtual thread factory with the default Virtual Thread executor.
+   * @param prefix the prefix of the virtual thread name.
+   * @param start the starting number of the virtual thread name, if -1, the 
number will not be appended.
    */
-  def newVirtualThreadFactory(prefix: String): ThreadFactory = {
-    require(isSupported, "Virtual thread is not supported.")
-    try {
-      val builderClass = 
ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder")
-      val ofVirtualClass = 
ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual")
-      val ofVirtualMethod = lookup.findStatic(classOf[Thread], "ofVirtual", 
MethodType.methodType(ofVirtualClass))
-      var builder = ofVirtualMethod.invoke()
-      val nameMethod = lookup.findVirtual(ofVirtualClass, "name",
-        MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long]))
-      // TODO support replace scheduler when we drop Java 8 support
-      val factoryMethod = lookup.findVirtual(builderClass, "factory", 
MethodType.methodType(classOf[ThreadFactory]))
-      builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero)
-      factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
-    } catch {
-      case NonFatal(e) =>
-        // --add-opens java.base/java.lang=ALL-UNNAMED
-        throw new UnsupportedOperationException("Failed to create virtual 
thread factory", e)
-    }
+  def newVirtualThreadFactory(prefix: String, start: Int): ThreadFactory = {
+    newVirtualThreadFactory(prefix, start, null)
   }
 
   /**
-   * Create a virtual thread factory with the specified executor as the 
scheduler of virtual thread.
+   * Create a virtual thread factory with the default Virtual Thread executor.
+   * @param prefix the prefix of the virtual thread name.
+   * @param start the starting number of the virtual thread name, if -1, the 
number will not be appended.
+   * @param executor the executor to be used as the scheduler of virtual 
thread. If null, the default scheduler will be used.
    */
-  def newVirtualThreadFactory(prefix: String, executor: ExecutorService): 
ThreadFactory =
+  def newVirtualThreadFactory(prefix: String, start: Int, executor: 
ExecutorService): ThreadFactory = {
+    require(isSupported, "Virtual thread is not supported.")
+    require(prefix != null && prefix.nonEmpty, "prefix should not be null or 
empty.")
     try {
       val builderClass = 
ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder")
       val ofVirtualClass = 
ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual")
-      val ofVirtualMethod = classOf[Thread].getDeclaredMethod("ofVirtual")
-      var builder = ofVirtualMethod.invoke(null)
+      val ofVirtualMethod = lookup.findStatic(classOf[Thread], "ofVirtual", 
MethodType.methodType(ofVirtualClass))
+      var builder = ofVirtualMethod.invoke()
+      // set the name
+      if (start <= -1) {
+        val nameMethod = lookup.findVirtual(ofVirtualClass, "name",
+          MethodType.methodType(ofVirtualClass, classOf[String]))
+        builder = nameMethod.invoke(builder, prefix + "-virtual-thread")
+      } else {
+        val nameMethod = lookup.findVirtual(ofVirtualClass, "name",
+          MethodType.methodType(ofVirtualClass, classOf[String], 
classOf[Long]))
+        builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero)
+      }
+      // set the scheduler
       if (executor ne null) {
+        // Use reflection here, method handle is stricter on access control
         val clazz = builder.getClass
         val field = clazz.getDeclaredField("scheduler")
         field.setAccessible(true)
         field.set(builder, executor)
       }
-      val nameMethod = ofVirtualClass.getDeclaredMethod("name", 
classOf[String], classOf[Long])
-      val factoryMethod = builderClass.getDeclaredMethod("factory")
-      builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero)
+      val factoryMethod = lookup.findVirtual(builderClass, "factory", 
MethodType.methodType(classOf[ThreadFactory]))
       factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
     } catch {
       case NonFatal(e) =>
         // --add-opens java.base/java.lang=ALL-UNNAMED
         throw new UnsupportedOperationException("Failed to create virtual 
thread factory", e)
     }
+  }
 
   object CarrierThreadFactory extends ForkJoinPool.ForkJoinWorkerThreadFactory 
{
+    // --add-opens java.base/java.lang=ALL-UNNAMED
+    // --add-opens java.base/jdk.internal.misc=ALL-UNNAMED
     private val clazz = 
ClassLoader.getSystemClassLoader.loadClass("jdk.internal.misc.CarrierThread")
     // TODO lookup.findClass is only available in Java 9
     private val constructor = 
clazz.getDeclaredConstructor(classOf[ForkJoinPool])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to