This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new ebddbc2112 test: stabilize two flaky tests on JDK21+/virtualized
nightly (#2949)
ebddbc2112 is described below
commit ebddbc2112059e7aa74de7d80589a2085401c94e
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat May 9 18:49:46 2026 +0800
test: stabilize two flaky tests on JDK21+/virtualized nightly (#2949)
* test: stabilize resizer performance log sampling
Motivation:
The "record the performance log with the correct pool size" test in
MetricsBasedResizerSpec is intermittently flaky on JDK21+/JDK25 nightly
runs (e.g. apache/pekko#25586760215, ~12s before failing on JDK 25 /
Scala 2.13.x with virtualized dispatchers). Between the two
reportMessageCount checkpoints the test sent only one extra mockSend
per routee. With virtual-thread scheduling those extra messages can be
processed before the second reportMessageCount runs, leaving routees
idle (currentMessage = null) so messagesInRoutees underestimates the
queue and the resizer's fullyUtilized check falls through, no
performance log is recorded, and the final assertion fails with
"None was empty".
Modification:
Replace the inter-checkpoint mockSend pair with a second sendToAll
call (await=false) and Await.ready on its first latch. This keeps both
routees actively holding their currentMessage when reportMessageCount
runs, so the resizer always observes the routees as fully utilized
across the two checkpoints.
Result:
The test no longer relies on routees still being busy by chance after
their previous batch completes, and runs deterministically on
virtualized dispatchers.
* test: drop racy pre-shutdown closedCounter check in autocloseable
abrupt-termination test
Motivation:
"will close the autocloseable resource on abrupt materializer
termination" in FlowMapWithResourceSpec is intermittently flaky on
JDK 21 / Scala 3.3.x nightly runs (failing in 22-115ms with
"1 was not equal to 0" at FlowMapWithResourceSpec.scala:538).
mapWithResource carries the IODispatcher attribute which lands the
operator on a different dispatcher than Source.single and Sink.never,
introducing an async-island bridge whose SubSink can pull eagerly.
Once that pull propagates back, Source.single pushes its element and
completes, StatefulMap.onUpstreamFinish fires, closeStateAndComplete
runs the close callback and the AutoCloseable counter reaches 1 before
the test thread issues mat.shutdown(). The pre-shutdown
"closedCounter.get shouldBe 0" therefore loses the race on fast
JDK21+ scheduling.
Modification:
Remove the pre-shutdown closedCounter assertion. The test still proves
the abrupt-termination contract via the Await on created.future
(ensuring create() ran), the AbruptTerminationException assertion on
the materialized value, the Await on promise.future (ensuring close()
ran) and the post-shutdown closedCounter == 1 check.
Result:
Removes a timing assumption that doesn't reflect the operator's
contract, while keeping the close-on-abrupt-termination guarantee
under verification.
---
.../org/apache/pekko/routing/MetricsBasedResizerSpec.scala | 11 ++++-------
.../pekko/stream/scaladsl/FlowMapWithResourceSpec.scala | 1 -
2 files changed, 4 insertions(+), 8 deletions(-)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/routing/MetricsBasedResizerSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/routing/MetricsBasedResizerSpec.scala
index 984725d17d..59752e01b4 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/routing/MetricsBasedResizerSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/routing/MetricsBasedResizerSpec.scala
@@ -235,15 +235,12 @@ class MetricsBasedResizerSpec extends
PekkoSpec(ResizerSpec.config) with Default
"record the performance log with the correct pool size" in {
val resizer = DefaultOptimalSizeExploringResizer()
val router = TestRouter(routees(2))
- val msgs = router.sendToAll(await = true)
+ val msgs1 = router.sendToAll(await = true)
+ val msgs2 = router.sendToAll(await = false)
resizer.reportMessageCount(router.routees, router.msgs.size)
- msgs.head.second.open()
+ msgs1.head.second.open()
- router.mockSend(await = true, routeeIdx = 0)
- router.mockSend(await = false, routeeIdx = 1)
- awaitAssert {
- resizer.updatedStats(router.routees, router.msgs.size)._1.get(2)
should not be empty
- }
+ Await.ready(msgs2.head.first, timeout.duration)
resizer.reportMessageCount(router.routees, router.msgs.size)
resizer.performanceLog.get(2) should not be empty
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
index a3c2f9b400..103d07ff09 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
@@ -535,7 +535,6 @@ class FlowMapWithResourceSpec extends
StreamSpec(UnboundedMailboxConfig) {
.mapWithResource(create, (_: AutoCloseable, count) => count)
.runWith(Sink.never)(mat)
Await.result(created.future, 3.seconds.dilated) shouldBe Done
- closedCounter.get shouldBe 0
mat.shutdown()
matVal.failed.futureValue shouldBe an[AbruptTerminationException]
Await.result(promise.future, 3.seconds.dilated) shouldBe Done
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]