This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 4900599a96 Stabilize JDK 25 virtualized stream tests (#2945)
4900599a96 is described below
commit 4900599a96cac889f5d923d950f061a4f7bf73bf
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat May 9 05:24:10 2026 +0800
Stabilize JDK 25 virtualized stream tests (#2945)
---
.../org/apache/pekko/cluster/ClusterTestKit.scala | 3 ++-
.../pekko/cluster/MixedProtocolClusterSpec.scala | 2 ++
.../scaladsl/FlowMapAsyncUnorderedSpec.scala | 3 ++-
.../stream/scaladsl/FlowMapWithResourceSpec.scala | 19 ++++++++++++------
.../pekko/stream/scaladsl/FlowScanSpec.scala | 9 +++++----
.../org/apache/pekko/stream/scaladsl/HubSpec.scala | 23 +++++++++++-----------
6 files changed, 36 insertions(+), 23 deletions(-)
diff --git
a/cluster/src/test/scala/org/apache/pekko/cluster/ClusterTestKit.scala
b/cluster/src/test/scala/org/apache/pekko/cluster/ClusterTestKit.scala
index b5903cf314..b3aafde706 100644
--- a/cluster/src/test/scala/org/apache/pekko/cluster/ClusterTestKit.scala
+++ b/cluster/src/test/scala/org/apache/pekko/cluster/ClusterTestKit.scala
@@ -109,7 +109,8 @@ trait ClusterTestKit extends TestKitBase {
actorSystems.contains(actorSystem)
/** Shuts down all registered [[ActorSystem]]s */
- def shutdownAll(): Unit = actorSystems.foreach(sys => shutdown(sys,
10.seconds, verifySystemShutdown = true))
+ // The timeout is dilated by TestKit; keep a larger base for virtualized
JDK 25 nightly runs.
+ def shutdownAll(): Unit = actorSystems.foreach(sys => shutdown(sys,
30.seconds, verifySystemShutdown = true))
/**
* Force the passed [[ActorSystem]] to quit the cluster and shutdown.
diff --git
a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
index fc74fc7dbd..cf7d6b8dd6 100644
---
a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
+++
b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
@@ -37,6 +37,8 @@ object MixedProtocolClusterSpec {
pekko.remote.accept-protocol-names = ["pekko", "akka"]
pekko.remote.enforce-strict-config-prefix-check-on-join = on
+ pekko.coordinated-shutdown.phases.actor-system-terminate.timeout = 30 s
+
pekko.cluster.downing-provider-class =
"org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"
pekko.cluster.split-brain-resolver.active-strategy = keep-majority
pekko.cluster.jmx.multi-mbeans-in-same-jvm = on""")
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
index ef6c987dcd..d98f5fd79e 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
@@ -28,6 +28,7 @@ import pekko.stream.ActorAttributes.supervisionStrategy
import pekko.stream.Supervision.resumingDecider
import pekko.stream.testkit._
import pekko.stream.testkit.scaladsl._
+import pekko.testkit.TestDuration
import pekko.testkit.TestLatch
import pekko.testkit.TestProbe
@@ -361,7 +362,7 @@ class FlowMapAsyncUnorderedSpec extends StreamSpec {
Source(1 to N)
.mapAsyncUnordered(parallelism)(_ => deferred())
.runFold(0)((c, _) => c + 1)
- .futureValue(Timeout(3.seconds)) should ===(N)
+ .futureValue(Timeout(3.seconds.dilated)) should ===(N)
} finally {
timer.interrupt()
}
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
index caf6dcdac4..a3c2f9b400 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
@@ -41,7 +41,7 @@ import pekko.stream.impl.StreamSupervisor.Children
import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
import pekko.stream.testkit.Utils.{ assertDispatcher, TE,
UnboundedMailboxConfig }
import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
-import pekko.testkit.EventFilter
+import pekko.testkit.{ EventFilter, TestDuration }
import pekko.util.ByteString
class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) {
@@ -414,21 +414,24 @@ class FlowMapWithResourceSpec extends
StreamSpec(UnboundedMailboxConfig) {
@nowarn("msg=deprecated")
val mat = ActorMaterializer()
val promise = Promise[Done]()
+ val created = Promise[Done]()
val matVal = Source
.single(1)
.mapWithResource(() => {
+ created.trySuccess(Done)
newBufferedReader()
})((reader, count) => readLines(reader, count),
reader => {
reader.close()
- promise.complete(Success(Done))
+ promise.tryComplete(Success(Done))
Some(List("End"))
})
.mapConcat(identity)
.runWith(Sink.never)(mat)
+ Await.result(created.future, 3.seconds.dilated) shouldBe Done
mat.shutdown()
matVal.failed.futureValue shouldBe an[AbruptTerminationException]
- Await.result(promise.future, 3.seconds) shouldBe Done
+ Await.result(promise.future, 3.seconds.dilated) shouldBe Done
}
"will close the autocloseable resource when upstream complete" in {
@@ -517,21 +520,25 @@ class FlowMapWithResourceSpec extends
StreamSpec(UnboundedMailboxConfig) {
@nowarn("msg=deprecated")
val mat = ActorMaterializer()
val promise = Promise[Done]()
- val create = () =>
+ val created = Promise[Done]()
+ val create = () => {
+ created.trySuccess(Done)
new AutoCloseable {
override def close(): Unit = {
closedCounter.incrementAndGet()
- promise.complete(Success(Done))
+ promise.tryComplete(Success(Done))
}
}
+ }
val matVal = Source
.single(1)
.mapWithResource(create, (_: AutoCloseable, count) => count)
.runWith(Sink.never)(mat)
+ Await.result(created.future, 3.seconds.dilated) shouldBe Done
closedCounter.get shouldBe 0
mat.shutdown()
matVal.failed.futureValue shouldBe an[AbruptTerminationException]
- Await.result(promise.future, 3.seconds) shouldBe Done
+ Await.result(promise.future, 3.seconds.dilated) shouldBe Done
closedCounter.get shouldBe 1
}
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowScanSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowScanSpec.scala
index e4821ddab0..02f82347cb 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowScanSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowScanSpec.scala
@@ -33,7 +33,7 @@ class FlowScanSpec extends StreamSpec("""
"A Scan" must {
- def scan(s: Source[Int, NotUsed], duration: Duration = 5.seconds):
immutable.Seq[Int] =
+ def scan(s: Source[Int, NotUsed], duration: Duration =
remainingOrDefault): immutable.Seq[Int] =
Await.result(s.scan(0)(_ + _).runFold(immutable.Seq.empty[Int])(_ :+ _),
duration)
"Scan" in {
@@ -52,7 +52,7 @@ class FlowScanSpec extends StreamSpec("""
"emit values promptly" in {
val f = Source.single(1).concat(Source.maybe[Int]).scan(0)(_ +
_).take(2).runWith(Sink.seq)
- Await.result(f, 1.second) should ===(Seq(0, 1))
+ Await.result(f, remainingOrDefault) should ===(Seq(0, 1))
}
"restart properly" in {
@@ -63,7 +63,7 @@ class FlowScanSpec extends StreamSpec("""
old + current
}
.withAttributes(supervisionStrategy(Supervision.restartingDecider))
- Source(List(1, 3, -1, 5,
7)).via(scan).runWith(TestSink()).toStrict(1.second) should ===(
+ Source(List(1, 3, -1, 5,
7)).via(scan).runWith(TestSink()).toStrict(3.seconds) should ===(
Seq(0, 1, 4, 0, 5, 12))
}
@@ -75,7 +75,8 @@ class FlowScanSpec extends StreamSpec("""
old + current
}
.withAttributes(supervisionStrategy(Supervision.resumingDecider))
- Source(List(1, 3, -1, 5,
7)).via(scan).runWith(TestSink()).toStrict(1.second) should ===(Seq(0, 1, 4, 9,
16))
+ Source(List(1, 3, -1, 5,
7)).via(scan).runWith(TestSink()).toStrict(3.seconds) should ===(
+ Seq(0, 1, 4, 9, 16))
}
"scan normally for empty source" in {
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
index 1a396d823e..a6d57dee19 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
@@ -29,6 +29,7 @@ import pekko.stream.testkit.Utils.TE
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.testkit.scaladsl.TestSource
import pekko.testkit.EventFilter
+import pekko.testkit.TestDuration
import org.scalatest.time.{ Seconds, Span }
@@ -345,7 +346,7 @@ class HubSpec extends StreamSpec {
val broadcast = Source(1 to 10).runWith(BroadcastHub.sink[Int](1, 256))
val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet
- Await.result(resultOne, 1.second) should be(1 to 10) // fails
+ Await.result(resultOne, 3.seconds.dilated) should be(1 to 10)
}
"broadcast all elements to all consumers" in {
@@ -361,8 +362,8 @@ class HubSpec extends StreamSpec {
}
queue.complete() // only now is the source emptied
- Await.result(resultOne, 1.second) should be(1 to 10)
- Await.result(resultTwo, 1.second) should be(1 to 10)
+ Await.result(resultOne, 3.seconds.dilated) should be(1 to 10)
+ Await.result(resultTwo, 3.seconds.dilated) should be(1 to 10)
}
"broadcast all elements to all consumers with hot upstream" in {
@@ -370,8 +371,8 @@ class HubSpec extends StreamSpec {
val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet
val resultTwo = broadcast.runWith(Sink.seq)
- Await.result(resultOne, 1.second) should be(1 to 10)
- Await.result(resultTwo, 1.second) should be(1 to 10)
+ Await.result(resultOne, 3.seconds.dilated) should be(1 to 10)
+ Await.result(resultTwo, 3.seconds.dilated) should be(1 to 10)
}
"broadcast all elements to all consumers with hot upstream even some
subscriber unsubscribe" in {
@@ -383,8 +384,8 @@ class HubSpec extends StreamSpec {
val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet
val resultTwo = broadcast.runWith(Sink.seq) // nothing happening yet
- Await.result(resultOne, 1.second) should be(1 to 10)
- Await.result(resultTwo, 1.second) should be(1 to 10)
+ Await.result(resultOne, 3.seconds.dilated) should be(1 to 10)
+ Await.result(resultTwo, 3.seconds.dilated) should be(1 to 10)
}
"send the same prefix to consumers attaching around the same time if one
cancels earlier" in {
@@ -478,11 +479,11 @@ class HubSpec extends StreamSpec {
source.runWith(Sink.fromSubscriber(downstream))
downstream.request(1)
- val first = downstream.expectNext()
+ val first = downstream.expectNext(10.seconds.dilated)
for (i <- (first + 1) to (first + 10)) {
downstream.request(1)
- downstream.expectNext(i)
+ downstream.expectNext(10.seconds.dilated) should ===(i)
}
downstream.cancel()
@@ -554,7 +555,7 @@ class HubSpec extends StreamSpec {
Thread.sleep(10)
a[TE] shouldBe thrownBy {
- Await.result(source.runWith(Sink.seq), 3.seconds)
+ Await.result(source.runWith(Sink.seq), 3.seconds.dilated)
}
}
@@ -884,7 +885,7 @@ class HubSpec extends StreamSpec {
Thread.sleep(10)
a[TE] shouldBe thrownBy {
- Await.result(source.runWith(Sink.seq), 3.seconds)
+ Await.result(source.runWith(Sink.seq), 3.seconds.dilated)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]