This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.7.x by this push:
new 7aac04c3a1 fix: support setting the ForkJoinPool minimum-runnable
value (#2890) (#3037)
7aac04c3a1 is described below
commit 7aac04c3a1439acad865149ddc7ccb3411b3c4cd
Author: PJ Fanning <[email protected]>
AuthorDate: Sat Jun 6 19:37:07 2026 +0100
fix: support setting the ForkJoinPool minimum-runnable value (#2890) (#3037)
* fix: auto-tune ForkJoinPool minimum-runnable on JDK 21+ (#2890)
* fix: auto-tune ForkJoinPool minimum-runnable on JDK 21+
Motivation:
JDK-8300995 / JDK-8321335 changed compensation-thread creation in
ForkJoinPool asyncMode (FIFO) to be much more conservative. Pekko fork-join
dispatchers using the prior default `minimum-runnable = 1` are then prone
to starvation under blocking workloads on JDK 21+, which has shown up as
flaky nightly runs (#2870) and is the root cause behind the workflow
override added in #2889.
Modification:
* Introduce `ForkJoinExecutorConfigurator.resolveMinimumRunnable`, an
internal helper that computes the effective `minimum-runnable` value
from the configured value, the dispatcher parallelism, and the running
JDK major version. A negative configured value (the new default `-1`)
triggers the JDK-aware policy: on JDK 21+ the value becomes
`min(8, max(1, parallelism / 2))`; on JDK < 21 it stays at `1`.
Non-negative values are honoured verbatim, so explicit `0` still
disables compensation entirely and explicit positive values (including
`1`) keep their existing meaning.
* Change
`pekko.actor.default-dispatcher.fork-join-executor.minimum-runnable`
in `reference.conf` to the sentinel `-1` and update the doc block to
describe the new auto-selection rule.
* Add `ForkJoinExecutorConfiguratorSpec` with three groups of assertions:
(1) pure-function matrix on `resolveMinimumRunnable`; (2) directional
checks asserting the auto policy strictly raises the value on JDK 21+
and never exceeds the documented cap of 8; (3) wiring integration that
builds a `ForkJoinExecutorServiceFactory` from a real dispatcher config
and verifies the resolved value reaches the factory (guarding against
regressions of the resolver wiring).
Result:
Production users on JDK 21+ now benefit from the same starvation
mitigation that #2889 bolted onto the nightly CI workflow. Source and
binary compatibility are preserved (constructor defaults stay at `1`,
no signature changes, no MiMa filter required). Users wanting to opt
out can set `minimum-runnable = 1` (or any explicit value) to restore
the previous behaviour.
* fix: address PR review feedback
* License header: replace abbreviated header on the new
ForkJoinExecutorConfiguratorSpec with the canonical Apache 2.0
header used by other clean-room test files in the project
(per pjfanning's review comment).
* Narrow auto-policy scope from JDK 21+ to JDK 25+: nightly evidence
shows the asyncMode (FIFO) compensation-thread regression
(JDK-8300995 / JDK-8321335) surfaces most clearly on the JDK 25
line, while JDK 21 has been running fine on the legacy default of
1 for years. Keep the default unchanged on JDK 21 to avoid a
silent behaviour change for users who are not affected.
* Document the new auto-tuning behaviour in both
docs/src/main/paradox/dispatchers.md (classic) and
docs/src/main/paradox/typed/dispatchers.md, including the opt-out
instructions.
* Update reference.conf doc comment, configurator scaladoc, and the
spec assertions / pending guards to reflect the JDK 25+ scope.
* compile issues
* change the deafult to 1
* Update ForkJoinExecutorConfigurator.scala
---------
Co-authored-by: He-Pin(kerr) <[email protected]>
---
.../ForkJoinExecutorConfiguratorSpec.scala | 177 +++++++++++++++++++++
actor/src/main/resources/reference.conf | 18 +++
.../pekko/dispatch/PekkoJdk9ForkJoinPool.scala | 5 +-
.../dispatch/ForkJoinExecutorConfigurator.scala | 48 +++++-
docs/src/main/paradox/dispatchers.md | 7 +
docs/src/main/paradox/typed/dispatchers.md | 7 +
6 files changed, 252 insertions(+), 10 deletions(-)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfiguratorSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfiguratorSpec.scala
new file mode 100644
index 0000000000..5bf7c12255
--- /dev/null
+++
b/actor-tests/src/test/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfiguratorSpec.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.dispatch
+
+import java.util.concurrent.ThreadFactory
+
+import org.apache.pekko
+import pekko.testkit.PekkoSpec
+import pekko.util.JavaVersion
+
+import com.typesafe.config.{ Config, ConfigFactory }
+
+object ForkJoinExecutorConfiguratorSpec {
+ // Keep the root config explicit so the dispatcher-config integration checks
below
+ // see exactly the values this spec is asserting on (and not whatever the
project's
+ // global test configuration happens to set).
+ val config: Config = ConfigFactory.parseString("""
+ |fj-auto-default-dispatcher {
+ | executor = "fork-join-executor"
+ | fork-join-executor {
+ | parallelism-min = 8
+ | parallelism-factor = 1.0
+ | parallelism-max = 64
+ | }
+ |}
+ |fj-auto-small-dispatcher {
+ | executor = "fork-join-executor"
+ | fork-join-executor {
+ | parallelism-min = 1
+ | parallelism-factor = 1.0
+ | parallelism-max = 1
+ | }
+ |}
+ |fj-explicit-zero-dispatcher {
+ | executor = "fork-join-executor"
+ | fork-join-executor {
+ | parallelism-min = 8
+ | parallelism-factor = 1.0
+ | parallelism-max = 64
+ | minimum-runnable = 0
+ | }
+ |}
+ |fj-explicit-seven-dispatcher {
+ | executor = "fork-join-executor"
+ | fork-join-executor {
+ | parallelism-min = 8
+ | parallelism-factor = 1.0
+ | parallelism-max = 64
+ | minimum-runnable = 7
+ | }
+ |}
+ """.stripMargin)
+}
+
+class ForkJoinExecutorConfiguratorSpec extends
PekkoSpec(ForkJoinExecutorConfiguratorSpec.config) {
+
+ import ForkJoinExecutorConfigurator.resolveMinimumRunnable
+
+ "ForkJoinExecutorConfigurator.resolveMinimumRunnable" must {
+
+ "honour explicit zero (compensation disabled)" in {
+ resolveMinimumRunnable(configured = 0, parallelism = 16, jdkMajorVersion
= 25) shouldBe 0
+ resolveMinimumRunnable(configured = 0, parallelism = 16, jdkMajorVersion
= 17) shouldBe 0
+ }
+
+ "honour explicit positive overrides verbatim" in {
+ resolveMinimumRunnable(configured = 1, parallelism = 16, jdkMajorVersion
= 25) shouldBe 1
+ resolveMinimumRunnable(configured = 7, parallelism = 16, jdkMajorVersion
= 25) shouldBe 7
+ resolveMinimumRunnable(configured = 100, parallelism = 16,
jdkMajorVersion = 25) shouldBe 100
+ }
+
+ "auto-resolve to 1 on JDK < 25 regardless of parallelism (preserves legacy
behaviour)" in {
+ // JDK 21 keeps the legacy default per reviewer guidance: only JDK 25
nightlies
+ // showed the compensation-thread regression badly enough to warrant a
default change.
+ resolveMinimumRunnable(configured = -1, parallelism = 1, jdkMajorVersion
= 17) shouldBe 1
+ resolveMinimumRunnable(configured = -1, parallelism = 8, jdkMajorVersion
= 17) shouldBe 1
+ resolveMinimumRunnable(configured = -1, parallelism = 64,
jdkMajorVersion = 11) shouldBe 1
+ resolveMinimumRunnable(configured = -1, parallelism = 16,
jdkMajorVersion = 21) shouldBe 1
+ }
+
+ "auto-resolve using parallelism / 2 on JDK 25+ with min cap 1 and max cap
8" in {
+ resolveMinimumRunnable(configured = -1, parallelism = 1, jdkMajorVersion
= 25) shouldBe 1
+ resolveMinimumRunnable(configured = -1, parallelism = 2, jdkMajorVersion
= 25) shouldBe 1
+ resolveMinimumRunnable(configured = -1, parallelism = 4, jdkMajorVersion
= 25) shouldBe 2
+ resolveMinimumRunnable(configured = -1, parallelism = 8, jdkMajorVersion
= 25) shouldBe 4
+ resolveMinimumRunnable(configured = -1, parallelism = 16,
jdkMajorVersion = 25) shouldBe 8
+ resolveMinimumRunnable(configured = -1, parallelism = 64,
jdkMajorVersion = 25) shouldBe 8
+ }
+
+ "produce a strictly higher value on JDK 25+ than on JDK 21 for plausible
dispatcher sizes" in {
+ // Directional check: the auto policy must move the needle on the JDK
line that needs it
+ // (and only on that line — JDK 21 stays at the legacy default per
reviewer guidance).
+ for (parallelism <- Seq(4, 8, 16, 32, 64)) {
+ val legacy = resolveMinimumRunnable(configured = -1, parallelism,
jdkMajorVersion = 21)
+ val modern = resolveMinimumRunnable(configured = -1, parallelism,
jdkMajorVersion = 25)
+ withClue(s"parallelism=$parallelism legacy=$legacy modern=$modern: ") {
+ modern should be > legacy
+ }
+ }
+ }
+
+ "never exceed the documented max cap of 8" in {
+ for (parallelism <- 1 to 256; jdk <- Seq(21, 25, 30)) {
+ resolveMinimumRunnable(configured = -1, parallelism, jdk) should be <=
8
+ }
+ }
+ }
+
+ "ForkJoinExecutorConfigurator wiring" must {
+
+ // Build a factory from a real dispatcher config and return the resolved
+ // minimum-runnable. This proves the config value actually reaches the
+ // ForkJoinExecutorServiceFactory — guarding against the trivial regression
+ // of reverting resolveMinimumRunnable to a direct `config.getInt` read.
+ def resolvedMinimumRunnable(dispatcherId: String): Int = {
+ // `system.dispatchers.config(id)` resolves the dispatcher's full config
with
+ // reference.conf defaults applied (so `virtualize`, `minimum-runnable`,
etc.
+ // all have values).
+ val dispatcherConfig = system.dispatchers.config(dispatcherId)
+ val configurator = new ForkJoinExecutorConfigurator(
+ dispatcherConfig.getConfig("fork-join-executor"),
+ system.dispatchers.prerequisites)
+ val tf: ThreadFactory = system.dispatchers.prerequisites.threadFactory
+ val factory = configurator
+ .createExecutorServiceFactory(dispatcherId, tf)
+ .asInstanceOf[configurator.ForkJoinExecutorServiceFactory]
+ factory.minimumRunnable
+ }
+
+ "respect explicit minimum-runnable = 0" in {
+ resolvedMinimumRunnable("fj-explicit-zero-dispatcher") shouldBe 0
+ }
+
+ "respect explicit minimum-runnable = 7" in {
+ resolvedMinimumRunnable("fj-explicit-seven-dispatcher") shouldBe 7
+ }
+
+ "auto-scale the default (minimum-runnable not set) on JDK 25+" in {
+ if (JavaVersion.majorVersion < 25) pending
+
+ val resolved = resolvedMinimumRunnable("fj-auto-default-dispatcher")
+ // The dispatcher declares parallelism-min = 8 so effective parallelism
is at
+ // least 8; auto = min(8, max(1, parallelism/2)) must be at least 4 and
never
+ // exceed the documented cap of 8.
+ resolved should be >= 4
+ resolved should be <= 8
+ }
+
+ "keep the legacy value of 1 on JDK < 25 when the default is left
untouched" in {
+ if (JavaVersion.majorVersion >= 25) pending
+
+ resolvedMinimumRunnable("fj-auto-default-dispatcher") shouldBe 1
+ }
+
+ "never drop below 1 even for parallelism = 1 dispatchers" in {
+ val resolved = resolvedMinimumRunnable("fj-auto-small-dispatcher")
+ // parallelism = 1 implies parallelism/2 = 0, which the min-cap must
lift to 1.
+ // On JDK < 25 the legacy value of 1 is already the expected answer.
+ resolved shouldBe 1
+ }
+ }
+}
diff --git a/actor/src/main/resources/reference.conf
b/actor/src/main/resources/reference.conf
index 5ae5aaaa93..ae0ab55625 100644
--- a/actor/src/main/resources/reference.conf
+++ b/actor/src/main/resources/reference.conf
@@ -488,6 +488,24 @@ pekko {
# Read the documentation on `java.util.concurrent.ForkJoinPool` to
find out more. Default in hex is 0x7fff.
maximum-pool-size = 32767
+ # Minimum number of non-blocked (runnable) worker threads the pool
tries to maintain.
+ # When blocked worker count causes active threads to drop below this
threshold, the pool
+ # may create a compensation thread to maintain progress.
+ # In Pekko 1.x, the default value is 1 for backward compatibility, but
this is not ideal
+ # on newer JDKs.
+ #
+ # The special value -1 (default) selects a JDK-aware policy:
+ # * JDK 25+ : effective value = min(8, max(1, parallelism / 2))
+ # * JDK < 25: effective value = 1 (preserves the JDK behaviour prior
to this setting)
+ # Auto-selection on JDK 25+ mitigates the asyncMode (FIFO)
compensation-thread regression
+ # tracked in JDK-8300995 / JDK-8321335 that, in Pekko nightly tests,
surfaces most clearly
+ # on the JDK 25 line and can cause actor-heavy workloads to starve.
+ #
+ # Set explicitly to 0 to disable compensation entirely. If you set the
value to any
+ # non-negative integer then that value will be used as the effective
minimum-runnable
+ # threshold.
+ minimum-runnable = 1
+
# This config is new in Pekko v1.2.0 and only has an effect if you are
running with JDK 21 and above,
# When set to `on` but the underlying runtime does not support virtual
threads, an Exception will be thrown.
# Virtualize this dispatcher as a virtual-thread-executor
diff --git
a/actor/src/main/scala-jdk-9/org/apache/pekko/dispatch/PekkoJdk9ForkJoinPool.scala
b/actor/src/main/scala-jdk-9/org/apache/pekko/dispatch/PekkoJdk9ForkJoinPool.scala
index 8e089d16a3..adb49072d0 100644
---
a/actor/src/main/scala-jdk-9/org/apache/pekko/dispatch/PekkoJdk9ForkJoinPool.scala
+++
b/actor/src/main/scala-jdk-9/org/apache/pekko/dispatch/PekkoJdk9ForkJoinPool.scala
@@ -35,9 +35,10 @@ private[dispatch] final class PekkoJdk9ForkJoinPool(
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
maximumPoolSize: Int,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler,
- asyncMode: Boolean)
+ asyncMode: Boolean,
+ minimumRunnable: Int)
extends ForkJoinPool(parallelism, threadFactory,
unhandledExceptionHandler, asyncMode,
- 0, maximumPoolSize, 1, null,
ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS)
+ 0, maximumPoolSize, minimumRunnable, null,
ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS)
with LoadMetrics {
override def execute(r: Runnable): Unit =
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
index c7d47ced32..903641bbb5 100644
---
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
+++
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
@@ -15,6 +15,7 @@ package org.apache.pekko.dispatch
import com.typesafe.config.Config
import org.apache.pekko
+import pekko.annotation.InternalApi
import pekko.dispatch.VirtualThreadSupport.newVirtualThreadFactory
import pekko.util.JavaVersion
@@ -24,6 +25,29 @@ import scala.util.Try
object ForkJoinExecutorConfigurator {
+ /**
+ * INTERNAL API
+ *
+ * Resolves the effective `minimum-runnable` value for a fork-join
dispatcher.
+ * In Pekko 1.x, the default value for `minimum-runnable` is `1` for
backsward
+ * compatibility, but this is not ideal on newer JDKs.
+ *
+ * A negative value selects the JDK-aware policy:
+ * on JDK 25+ the value is `min(8, max(1, parallelism / 2))` to mitigate the
+ * asyncMode (FIFO) compensation-thread regression tracked in
+ * JDK-8300995 / JDK-8321335 (the impact is most visible on the JDK 25 line
in
+ * Pekko nightly tests); on older JDKs the value stays at `1` to preserve the
+ * pre-existing behaviour. Non-negative configured values are honoured
verbatim,
+ * so `0` still disables compensation entirely.
+ */
+ @InternalApi private[pekko] def resolveMinimumRunnable(
+ configured: Int,
+ parallelism: Int,
+ jdkMajorVersion: Int): Int =
+ if (configured >= 0) configured
+ else if (jdkMajorVersion >= 25) math.min(8, math.max(1, parallelism / 2))
+ else 1
+
/**
* INTERNAL PEKKO USAGE ONLY
*/
@@ -92,7 +116,8 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int,
val asyncMode: Boolean,
- val maxPoolSize: Int)
+ val maxPoolSize: Int,
+ val minimumRunnable: Int = 1)
extends ExecutorServiceFactory {
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
@@ -115,7 +140,8 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
val methodHandleLookup = MethodHandles.lookup()
val mt = MethodType.methodType(classOf[Unit], classOf[Int],
classOf[ForkJoinPool.ForkJoinWorkerThreadFactory],
- classOf[Int], classOf[Thread.UncaughtExceptionHandler],
classOf[Boolean])
+ classOf[Int], classOf[Thread.UncaughtExceptionHandler],
+ classOf[Boolean], classOf[Int])
methodHandleLookup.findConstructor(clz, mt)
}
}
@@ -136,7 +162,7 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
val pool = pekkoJdk9ForkJoinPoolHandleOpt match {
case Some(handle) =>
// carrier Thread only exists in JDK 17+
- handle.invoke(parallelism, tf, maxPoolSize,
MonitorableThreadFactory.doNothing, asyncMode)
+ handle.invoke(parallelism, tf, maxPoolSize,
MonitorableThreadFactory.doNothing, asyncMode, minimumRunnable)
.asInstanceOf[ExecutorService with LoadMetrics]
case _ =>
new PekkoForkJoinPool(parallelism, tf,
MonitorableThreadFactory.doNothing, asyncMode)
@@ -188,15 +214,21 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
""""task-peeking-mode" in "fork-join-executor" section could only
set to "FIFO" or "LIFO".""")
}
+ val parallelism = ThreadPoolConfig.scaledPoolSize(
+ config.getInt("parallelism-min"),
+ config.getDouble("parallelism-factor"),
+ config.getInt("parallelism-max"))
+
new ForkJoinExecutorServiceFactory(
id,
validate(tf),
- ThreadPoolConfig.scaledPoolSize(
- config.getInt("parallelism-min"),
- config.getDouble("parallelism-factor"),
- config.getInt("parallelism-max")),
+ parallelism,
asyncMode,
- config.getInt("maximum-pool-size")
+ config.getInt("maximum-pool-size"),
+ ForkJoinExecutorConfigurator.resolveMinimumRunnable(
+ config.getInt("minimum-runnable"),
+ parallelism,
+ JavaVersion.majorVersion)
)
}
}
diff --git a/docs/src/main/paradox/dispatchers.md
b/docs/src/main/paradox/dispatchers.md
index 37cd6eb1b6..3ee19c2f3e 100644
--- a/docs/src/main/paradox/dispatchers.md
+++ b/docs/src/main/paradox/dispatchers.md
@@ -44,6 +44,13 @@ You can read more about parallelism in the JDK's
[ForkJoinPool documentation](ht
When Running on Java 9+, you can use `maximum-pool-size` to set the upper
bound on the total number of threads allocated by the ForkJoinPool.
+In Pekko 1.7.0, we introduced a setting for the `fork-join-executor` called
`minimum-runnable`. In Pekko 1.x, this defaults to `1` for backsward
compatibility reasons.
+If you override the default and set it to `-1`, this will raise the number of
compensation threads the
+pool will create when workers block, mitigating the asyncMode (FIFO)
compensation regression tracked in
+[JDK-8300995](https://bugs.openjdk.org/browse/JDK-8300995) /
[JDK-8321335](https://bugs.openjdk.org/browse/JDK-8321335).
+In Pekko nightly tests, this surfaces most clearly on the JDK 25 line and can
cause actor-heavy workloads to starve.
+You can set an explicit non-negative integer `minimum-runnable` value to opt
out of the auto-selection and control the exact number of compensation threads.
Setting `minimum-runnable = 0` disables compensation entirely.
+
**Experimental**: When Running on Java 21+, you can use `virtualize=on` to
enable the virtual threads feature.
When using virtual threads, all virtual threads will use the same `unparker`,
so you may want to
increase the number of `jdk.unparker.maxPoolSize`.
diff --git a/docs/src/main/paradox/typed/dispatchers.md
b/docs/src/main/paradox/typed/dispatchers.md
index 8f811ec81f..2b4c9051b0 100644
--- a/docs/src/main/paradox/typed/dispatchers.md
+++ b/docs/src/main/paradox/typed/dispatchers.md
@@ -129,6 +129,13 @@ You can read more about parallelism in the JDK's
[ForkJoinPool documentation](ht
When Running on Java 9+, you can use `maximum-pool-size` to set the upper
bound on the total number of threads allocated by the ForkJoinPool.
+In Pekko 1.7.0, we introduced a setting for the `fork-join-executor` called
`minimum-runnable`. In Pekko 1.x, this defaults to `1` for backsward
compatibility reasons.
+If you override the default and set it to `-1`, this will raise the number of
compensation threads the
+pool will create when workers block, mitigating the asyncMode (FIFO)
compensation regression tracked in
+[JDK-8300995](https://bugs.openjdk.org/browse/JDK-8300995) /
[JDK-8321335](https://bugs.openjdk.org/browse/JDK-8321335).
+In Pekko nightly tests, this surfaces most clearly on the JDK 25 line and can
cause actor-heavy workloads to starve.
+You can set an explicit non-negative integer `minimum-runnable` value to opt
out of the auto-selection and control the exact number of compensation threads.
Setting `minimum-runnable = 0` disables compensation entirely.
+
**Experimental**: When Running on Java 21+, you can use `virtualize=on` to
enable the virtual threads feature.
When using virtual threads, all virtual threads will use the same `unparker`,
so you may want to
increase the number of `jdk.unparker.maxPoolSize`.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]