This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new e3e01889c8 feat: Support setting starting number of virtual thread.
(#2242)
e3e01889c8 is described below
commit e3e01889c8931982ed7b6e40adb9d1f42ab35508
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu Sep 25 23:59:00 2025 +0800
feat: Support setting starting number of virtual thread. (#2242)
---
.../dispatch/ForkJoinPoolVirtualThreadSpec.scala | 21 ++++++++
.../dispatch/ThreadPoolVirtualThreadSpec.scala | 22 ++++++++-
.../dispatch/VirtualThreadPoolDispatcherSpec.scala | 2 +-
actor/src/main/resources/reference.conf | 18 ++++++-
.../apache/pekko/dispatch/AbstractDispatcher.scala | 16 +++----
.../dispatch/ForkJoinExecutorConfigurator.scala | 8 ++--
.../apache/pekko/dispatch/ThreadPoolBuilder.scala | 19 ++++++--
.../pekko/dispatch/VirtualThreadSupport.scala | 56 ++++++++++++----------
8 files changed, 115 insertions(+), 47 deletions(-)
diff --git
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
index 550f34ef90..dd97a74a0d 100644
---
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
+++
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
@@ -35,6 +35,19 @@ object ForkJoinPoolVirtualThreadSpec {
| parallelism-max = 2
| parallelism-min = 2
| virtualize = on
+ | virtual-thread-start-number = 0
+ | }
+ | }
+ | task-dispatcher-short {
+ | mailbox-type =
"org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
+ | throughput = 5
+ | executor = "fork-join-executor"
+ | fork-join-executor {
+ | parallelism-factor = 2
+ | parallelism-max = 2
+ | parallelism-min = 2
+ | virtualize = on
+ | virtual-thread-start-number = -1
| }
| }
|}
@@ -63,6 +76,14 @@ class ForkJoinPoolVirtualThreadSpec extends
PekkoSpec(ForkJoinPoolVirtualThreadS
name should
include("ForkJoinPoolVirtualThreadSpec-custom.task-dispatcher-virtual-thread-")
}
}
+
+ val actor2 = system.actorOf(Props(new
ThreadNameActor).withDispatcher("custom.task-dispatcher-short"))
+ for (_ <- 1 to 1000) {
+ actor2 ! "ping"
+ expectMsgPF() { case name: String =>
+ name should
include("ForkJoinPoolVirtualThreadSpec-custom.task-dispatcher-short-virtual-thread")
+ }
+ }
}
}
diff --git
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala
index 406730c233..157174ca88 100644
---
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala
+++
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala
@@ -31,8 +31,20 @@ object ThreadPoolVirtualThreadSpec {
| throughput = 5
| executor = "thread-pool-executor"
| thread-pool-executor {
- | fixed-pool-size = 4
+ | fixed-pool-size = 1
| virtualize = on
+ | virtual-thread-start-number = 0
+ | }
+ | }
+ |
+ | task-dispatcher-short {
+ | mailbox-type =
"org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
+ | throughput = 5
+ | executor = "thread-pool-executor"
+ | thread-pool-executor {
+ | fixed-pool-size = 1
+ | virtualize = on
+ | virtual-thread-start-number = -1
| }
| }
|}
@@ -61,6 +73,14 @@ class ThreadPoolVirtualThreadSpec extends
PekkoSpec(ThreadPoolVirtualThreadSpec.
name should
include("ThreadPoolVirtualThreadSpec-custom.task-dispatcher-virtual-thread-")
}
}
+
+ val actor2 = system.actorOf(Props(new
ThreadNameActor).withDispatcher("custom.task-dispatcher-short"))
+ for (_ <- 1 to 1000) {
+ actor2 ! "ping"
+ expectMsgPF() { case name: String =>
+ name should
include("ThreadPoolVirtualThreadSpec-custom.task-dispatcher-short-virtual-thread")
+ }
+ }
}
}
diff --git
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala
index 2c8266c6fa..a28607485e 100644
---
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala
+++
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala
@@ -50,7 +50,7 @@ class VirtualThreadPoolDispatcherSpec extends
PekkoSpec(VirtualThreadPoolDispatc
for (_ <- 1 to 1000) {
innocentActor ! "ping"
expectMsgPF() { case name: String =>
- name should
include("VirtualThreadPoolDispatcherSpec-virtual-thread-dispatcher-virtual-thread-")
+ name should
include("VirtualThreadPoolDispatcherSpec-virtual-thread-dispatcher-virtual-thread")
}
}
}
diff --git a/actor/src/main/resources/reference.conf
b/actor/src/main/resources/reference.conf
index 740c2975b4..9b12fb106b 100644
--- a/actor/src/main/resources/reference.conf
+++ b/actor/src/main/resources/reference.conf
@@ -492,6 +492,10 @@ pekko {
# --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
# --add-opens=java.base/java.lang=ALL-UNNAMED
virtualize = off
+
+ # When virtualize = on, you can set the starting id for virtual
threads created by this dispatcher.
+ # -1 (default) to turn off, 0 to start from 0, or any positive number
to start from that number.
+ virtual-thread-start-number = -1
}
# This will be used if you have set "executor = "thread-pool-executor""
@@ -529,7 +533,7 @@ pekko {
# ceil(available processors * factor)
# The maximumPoolSize will not be less than corePoolSize.
# It is only used if using a bounded task queue.
- max-pool-size-factor = 3.0
+ max-pool-size-factor = 3.0
# Max number of threads to cap factor-based maximumPoolSize number to
max-pool-size-max = 64
@@ -555,6 +559,10 @@ pekko {
# --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
# --add-opens=java.base/java.lang=ALL-UNNAMED
virtualize = off
+
+ # When virtualize = on, you can set the starting number for virtual
threads created by this dispatcher.
+ # -1 (default) to turn off, 0 to start from 0, or any positive number
to start from that number.
+ virtual-thread-start-number = -1
}
# This will be used if you have set "executor = "virtual-thread-executor"
@@ -569,6 +577,10 @@ pekko {
#jdk.virtualThreadScheduler.minRunnable
#jdk.unparker.maxPoolSize
fallback = "fork-join-executor"
+
+ # When using virtual-thread-executor, you can set the starting number
for virtual threads created by this dispatcher.
+ # -1 (default) to turn off, 0 to start from 0, or any positive number
to start from that number.
+ virtual-thread-start-number = -1
}
# How long time the dispatcher will wait for new actors until it shuts
down
shutdown-timeout = 1s
@@ -623,6 +635,10 @@ pekko {
# --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
# --add-opens=java.base/java.lang=ALL-UNNAMED
virtualize = off
+
+ # When virtualize = on, you can set the starting number for virtual
threads created by this dispatcher.
+ # -1 (default) to turn off, 0 to start from 0, or any positive number
to start from that number.
+ virtual-thread-start-number = -1
}
}
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
index 4c4e74d448..142ac836fe 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
@@ -28,7 +28,7 @@ import pekko.dispatch.affinity.AffinityPoolConfigurator
import pekko.dispatch.sysmsg._
import pekko.event.EventStream
import pekko.event.Logging.{ emptyMDC, Debug, Error, LogEventException,
Warning }
-import pekko.util.{ Index, JavaVersion }
+import pekko.util.Index
import com.typesafe.config.Config
@@ -412,14 +412,14 @@ abstract class MessageDispatcherConfigurator(_config:
Config, val prerequisites:
final class VirtualThreadExecutorConfigurator(config: Config, prerequisites:
DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {
- override def isVirtualized: Boolean = true
-
+ override def isVirtualized: Boolean = VirtualThreadSupport.isSupported
+ override def virtualThreadStartNumber: Int =
config.getInt("virtual-thread-start-number")
override def createExecutorServiceFactory(id: String, threadFactory:
ThreadFactory): ExecutorServiceFactory = {
import VirtualThreadSupport._
val tf: ThreadFactory = threadFactory match {
case MonitorableThreadFactory(name, _, contextClassLoader,
exceptionHandler, _) =>
new ThreadFactory {
- private val vtFactory = newVirtualThreadFactory(name + "-" + id)
+ private val vtFactory = newVirtualThreadFactory(name + "-" + id,
virtualThreadStartNumber)
override def newThread(r: Runnable): Thread = {
val vt = vtFactory.newThread(r)
@@ -428,7 +428,7 @@ final class VirtualThreadExecutorConfigurator(config:
Config, prerequisites: Dis
vt
}
}
- case _ => newVirtualThreadFactory(prerequisites.settings.name + "-" +
id);
+ case _ => newVirtualThreadFactory(prerequisites.settings.name + "-" +
id, virtualThreadStartNumber);
}
new ExecutorServiceFactory {
override def createExecutorService: ExecutorService with LoadMetrics = {
@@ -502,7 +502,7 @@ trait ThreadPoolExecutorServiceFactoryProvider extends
ExecutorServiceFactoryPro
case m: MonitorableThreadFactory => m.name + "-" + id
case _ => id
}
- createVirtualized(tf, pool, prefixName)
+ createVirtualized(tf, pool, prefixName, virtualThreadStartNumber)
} else pool
}
}
@@ -514,8 +514,8 @@ class ThreadPoolExecutorConfigurator(config: Config,
prerequisites: DispatcherPr
extends ExecutorServiceConfigurator(config, prerequisites)
with ThreadPoolExecutorServiceFactoryProvider {
override val threadPoolConfig: ThreadPoolConfig =
createThreadPoolConfigBuilder(config, prerequisites).config
- override val isVirtualized: Boolean = threadPoolConfig.isVirtualized &&
JavaVersion.majorVersion >= 21
-
+ override val isVirtualized: Boolean = threadPoolConfig.isVirtualized &&
VirtualThreadSupport.isSupported
+ override def virtualThreadStartNumber: Int =
config.getInt("virtual-thread-start-number")
protected def createThreadPoolConfigBuilder(
config: Config,
@nowarn("msg=never used") prerequisites: DispatcherPrerequisites):
ThreadPoolConfigBuilder = {
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
index 7aa1aeabdd..e5d7e7f8ce 100644
---
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
+++
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
@@ -15,9 +15,6 @@ package org.apache.pekko.dispatch
import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask,
ThreadFactory, TimeUnit }
-import org.apache.pekko
-import pekko.util.JavaVersion
-
import com.typesafe.config.Config
object ForkJoinExecutorConfigurator {
@@ -73,7 +70,8 @@ object ForkJoinExecutorConfigurator {
class ForkJoinExecutorConfigurator(config: Config, prerequisites:
DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {
import ForkJoinExecutorConfigurator._
- final override val isVirtualized: Boolean = config.getBoolean("virtualize")
&& JavaVersion.majorVersion >= 21
+ final override val isVirtualized: Boolean = config.getBoolean("virtualize")
&& VirtualThreadSupport.isSupported
+ override def virtualThreadStartNumber: Int =
config.getInt("virtual-thread-start-number")
def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = t
match {
case correct: ForkJoinPool.ForkJoinWorkerThreadFactory => correct
@@ -115,7 +113,7 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
if (isVirtualized) {
// we need to cast here,
- createVirtualized(threadFactory.asInstanceOf[ThreadFactory], pool, id)
+ createVirtualized(threadFactory.asInstanceOf[ThreadFactory], pool, id,
virtualThreadStartNumber)
} else {
pool
}
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
index 093c5645f1..e1321c7cab 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
@@ -77,17 +77,26 @@ trait ExecutorServiceFactory {
*/
trait ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory):
ExecutorServiceFactory
+
+ /**
+ * Whether the executor service created by this factory should use virtual
threads.
+ */
def isVirtualized: Boolean = false // can be overridden by implementations
- protected def createVirtualized(
- threadFactory: ThreadFactory,
+ /**
+ * The starting number of the virtual thread name, if -1, the number will
not be appended.
+ */
+ def virtualThreadStartNumber: Int = 0 // can be overridden by implementations
+
+ protected def createVirtualized(threadFactory: ThreadFactory,
pool: ExecutorService with LoadMetrics,
- prefixName: String): ExecutorService = {
+ prefixName: String,
+ startNumber: Int): ExecutorService = {
// when virtualized, we need enhanced thread factory
val factory: ThreadFactory = threadFactory match {
case MonitorableThreadFactory(name, _, contextClassLoader,
exceptionHandler, _) =>
new ThreadFactory {
- private val vtFactory = newVirtualThreadFactory(name, pool) // use
the pool as the scheduler
+ private val vtFactory = newVirtualThreadFactory(name, startNumber,
pool) // use the pool as the scheduler
override def newThread(r: Runnable): Thread = {
val vt = vtFactory.newThread(r)
@@ -96,7 +105,7 @@ trait ExecutorServiceFactoryProvider {
vt
}
}
- case _ => newVirtualThreadFactory(prefixName, pool); // use the pool as
the scheduler
+ case _ => newVirtualThreadFactory(prefixName, startNumber, pool); // use
the pool as the scheduler
}
// wrap the pool with virtualized executor service
new VirtualizedExecutorService(
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
index 6b3532ba99..257da4d97b 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
@@ -28,7 +28,7 @@ import pekko.util.JavaVersion
@InternalApi
private[dispatch] object VirtualThreadSupport {
- val zero = java.lang.Long.valueOf(0L)
+ private val zero = java.lang.Long.valueOf(0L)
private val lookup = MethodHandles.publicLookup()
/**
@@ -57,53 +57,57 @@ private[dispatch] object VirtualThreadSupport {
/**
* Create a virtual thread factory with the default Virtual Thread executor.
+ * @param prefix the prefix of the virtual thread name.
+ * @param start the starting number of the virtual thread name, if -1, the
number will not be appended.
*/
- def newVirtualThreadFactory(prefix: String): ThreadFactory = {
- require(isSupported, "Virtual thread is not supported.")
- try {
- val builderClass =
ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder")
- val ofVirtualClass =
ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual")
- val ofVirtualMethod = lookup.findStatic(classOf[Thread], "ofVirtual",
MethodType.methodType(ofVirtualClass))
- var builder = ofVirtualMethod.invoke()
- val nameMethod = lookup.findVirtual(ofVirtualClass, "name",
- MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long]))
- // TODO support replace scheduler when we drop Java 8 support
- val factoryMethod = lookup.findVirtual(builderClass, "factory",
MethodType.methodType(classOf[ThreadFactory]))
- builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero)
- factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
- } catch {
- case NonFatal(e) =>
- // --add-opens java.base/java.lang=ALL-UNNAMED
- throw new UnsupportedOperationException("Failed to create virtual
thread factory", e)
- }
+ def newVirtualThreadFactory(prefix: String, start: Int): ThreadFactory = {
+ newVirtualThreadFactory(prefix, start, null)
}
/**
- * Create a virtual thread factory with the specified executor as the
scheduler of virtual thread.
+ * Create a virtual thread factory with the default Virtual Thread executor.
+ * @param prefix the prefix of the virtual thread name.
+ * @param start the starting number of the virtual thread name, if -1, the
number will not be appended.
+ * @param executor the executor to be used as the scheduler of virtual
thread. If null, the default scheduler will be used.
*/
- def newVirtualThreadFactory(prefix: String, executor: ExecutorService):
ThreadFactory =
+ def newVirtualThreadFactory(prefix: String, start: Int, executor:
ExecutorService): ThreadFactory = {
+ require(isSupported, "Virtual thread is not supported.")
+ require(prefix != null && prefix.nonEmpty, "prefix should not be null or
empty.")
try {
val builderClass =
ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder")
val ofVirtualClass =
ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual")
- val ofVirtualMethod = classOf[Thread].getDeclaredMethod("ofVirtual")
- var builder = ofVirtualMethod.invoke(null)
+ val ofVirtualMethod = lookup.findStatic(classOf[Thread], "ofVirtual",
MethodType.methodType(ofVirtualClass))
+ var builder = ofVirtualMethod.invoke()
+ // set the name
+ if (start <= -1) {
+ val nameMethod = lookup.findVirtual(ofVirtualClass, "name",
+ MethodType.methodType(ofVirtualClass, classOf[String]))
+ builder = nameMethod.invoke(builder, prefix + "-virtual-thread")
+ } else {
+ val nameMethod = lookup.findVirtual(ofVirtualClass, "name",
+ MethodType.methodType(ofVirtualClass, classOf[String],
classOf[Long]))
+ builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero)
+ }
+ // set the scheduler
if (executor ne null) {
+ // Use reflection here, method handle is stricter on access control
val clazz = builder.getClass
val field = clazz.getDeclaredField("scheduler")
field.setAccessible(true)
field.set(builder, executor)
}
- val nameMethod = ofVirtualClass.getDeclaredMethod("name",
classOf[String], classOf[Long])
- val factoryMethod = builderClass.getDeclaredMethod("factory")
- builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero)
+ val factoryMethod = lookup.findVirtual(builderClass, "factory",
MethodType.methodType(classOf[ThreadFactory]))
factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
} catch {
case NonFatal(e) =>
// --add-opens java.base/java.lang=ALL-UNNAMED
throw new UnsupportedOperationException("Failed to create virtual
thread factory", e)
}
+ }
object CarrierThreadFactory extends ForkJoinPool.ForkJoinWorkerThreadFactory
{
+ // --add-opens java.base/java.lang=ALL-UNNAMED
+ // --add-opens java.base/jdk.internal.misc=ALL-UNNAMED
private val clazz =
ClassLoader.getSystemClassLoader.loadClass("jdk.internal.misc.CarrierThread")
// TODO lookup.findClass is only available in Java 9
private val constructor =
clazz.getDeclaredConstructor(classOf[ForkJoinPool])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]