This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 4900599a96 Stabilize JDK 25 virtualized stream tests (#2945)
4900599a96 is described below

commit 4900599a96cac889f5d923d950f061a4f7bf73bf
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat May 9 05:24:10 2026 +0800

    Stabilize JDK 25 virtualized stream tests (#2945)
---
 .../org/apache/pekko/cluster/ClusterTestKit.scala  |  3 ++-
 .../pekko/cluster/MixedProtocolClusterSpec.scala   |  2 ++
 .../scaladsl/FlowMapAsyncUnorderedSpec.scala       |  3 ++-
 .../stream/scaladsl/FlowMapWithResourceSpec.scala  | 19 ++++++++++++------
 .../pekko/stream/scaladsl/FlowScanSpec.scala       |  9 +++++----
 .../org/apache/pekko/stream/scaladsl/HubSpec.scala | 23 +++++++++++-----------
 6 files changed, 36 insertions(+), 23 deletions(-)

diff --git 
a/cluster/src/test/scala/org/apache/pekko/cluster/ClusterTestKit.scala 
b/cluster/src/test/scala/org/apache/pekko/cluster/ClusterTestKit.scala
index b5903cf314..b3aafde706 100644
--- a/cluster/src/test/scala/org/apache/pekko/cluster/ClusterTestKit.scala
+++ b/cluster/src/test/scala/org/apache/pekko/cluster/ClusterTestKit.scala
@@ -109,7 +109,8 @@ trait ClusterTestKit extends TestKitBase {
       actorSystems.contains(actorSystem)
 
     /** Shuts down all registered [[ActorSystem]]s */
-    def shutdownAll(): Unit = actorSystems.foreach(sys => shutdown(sys, 
10.seconds, verifySystemShutdown = true))
+    // The timeout is dilated by TestKit; keep a larger base for virtualized 
JDK 25 nightly runs.
+    def shutdownAll(): Unit = actorSystems.foreach(sys => shutdown(sys, 
30.seconds, verifySystemShutdown = true))
 
     /**
      * Force the passed [[ActorSystem]] to quit the cluster and shutdown.
diff --git 
a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
 
b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
index fc74fc7dbd..cf7d6b8dd6 100644
--- 
a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
+++ 
b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
@@ -37,6 +37,8 @@ object MixedProtocolClusterSpec {
       pekko.remote.accept-protocol-names = ["pekko", "akka"]
       pekko.remote.enforce-strict-config-prefix-check-on-join = on
 
+      pekko.coordinated-shutdown.phases.actor-system-terminate.timeout = 30 s
+
       pekko.cluster.downing-provider-class = 
"org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"
       pekko.cluster.split-brain-resolver.active-strategy = keep-majority
       pekko.cluster.jmx.multi-mbeans-in-same-jvm = on""")
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
index ef6c987dcd..d98f5fd79e 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
@@ -28,6 +28,7 @@ import pekko.stream.ActorAttributes.supervisionStrategy
 import pekko.stream.Supervision.resumingDecider
 import pekko.stream.testkit._
 import pekko.stream.testkit.scaladsl._
+import pekko.testkit.TestDuration
 import pekko.testkit.TestLatch
 import pekko.testkit.TestProbe
 
@@ -361,7 +362,7 @@ class FlowMapAsyncUnorderedSpec extends StreamSpec {
         Source(1 to N)
           .mapAsyncUnordered(parallelism)(_ => deferred())
           .runFold(0)((c, _) => c + 1)
-          .futureValue(Timeout(3.seconds)) should ===(N)
+          .futureValue(Timeout(3.seconds.dilated)) should ===(N)
       } finally {
         timer.interrupt()
       }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
index caf6dcdac4..a3c2f9b400 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
@@ -41,7 +41,7 @@ import pekko.stream.impl.StreamSupervisor.Children
 import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
 import pekko.stream.testkit.Utils.{ assertDispatcher, TE, 
UnboundedMailboxConfig }
 import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
-import pekko.testkit.EventFilter
+import pekko.testkit.{ EventFilter, TestDuration }
 import pekko.util.ByteString
 
 class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) {
@@ -414,21 +414,24 @@ class FlowMapWithResourceSpec extends 
StreamSpec(UnboundedMailboxConfig) {
       @nowarn("msg=deprecated")
       val mat = ActorMaterializer()
       val promise = Promise[Done]()
+      val created = Promise[Done]()
       val matVal = Source
         .single(1)
         .mapWithResource(() => {
+          created.trySuccess(Done)
           newBufferedReader()
         })((reader, count) => readLines(reader, count),
           reader => {
             reader.close()
-            promise.complete(Success(Done))
+            promise.tryComplete(Success(Done))
             Some(List("End"))
           })
         .mapConcat(identity)
         .runWith(Sink.never)(mat)
+      Await.result(created.future, 3.seconds.dilated) shouldBe Done
       mat.shutdown()
       matVal.failed.futureValue shouldBe an[AbruptTerminationException]
-      Await.result(promise.future, 3.seconds) shouldBe Done
+      Await.result(promise.future, 3.seconds.dilated) shouldBe Done
     }
 
     "will close the autocloseable resource when upstream complete" in {
@@ -517,21 +520,25 @@ class FlowMapWithResourceSpec extends 
StreamSpec(UnboundedMailboxConfig) {
       @nowarn("msg=deprecated")
       val mat = ActorMaterializer()
       val promise = Promise[Done]()
-      val create = () =>
+      val created = Promise[Done]()
+      val create = () => {
+        created.trySuccess(Done)
         new AutoCloseable {
           override def close(): Unit = {
             closedCounter.incrementAndGet()
-            promise.complete(Success(Done))
+            promise.tryComplete(Success(Done))
           }
         }
+      }
       val matVal = Source
         .single(1)
         .mapWithResource(create, (_: AutoCloseable, count) => count)
         .runWith(Sink.never)(mat)
+      Await.result(created.future, 3.seconds.dilated) shouldBe Done
       closedCounter.get shouldBe 0
       mat.shutdown()
       matVal.failed.futureValue shouldBe an[AbruptTerminationException]
-      Await.result(promise.future, 3.seconds) shouldBe Done
+      Await.result(promise.future, 3.seconds.dilated) shouldBe Done
       closedCounter.get shouldBe 1
     }
 
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowScanSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowScanSpec.scala
index e4821ddab0..02f82347cb 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowScanSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowScanSpec.scala
@@ -33,7 +33,7 @@ class FlowScanSpec extends StreamSpec("""
 
   "A Scan" must {
 
-    def scan(s: Source[Int, NotUsed], duration: Duration = 5.seconds): 
immutable.Seq[Int] =
+    def scan(s: Source[Int, NotUsed], duration: Duration = 
remainingOrDefault): immutable.Seq[Int] =
       Await.result(s.scan(0)(_ + _).runFold(immutable.Seq.empty[Int])(_ :+ _), 
duration)
 
     "Scan" in {
@@ -52,7 +52,7 @@ class FlowScanSpec extends StreamSpec("""
 
     "emit values promptly" in {
       val f = Source.single(1).concat(Source.maybe[Int]).scan(0)(_ + 
_).take(2).runWith(Sink.seq)
-      Await.result(f, 1.second) should ===(Seq(0, 1))
+      Await.result(f, remainingOrDefault) should ===(Seq(0, 1))
     }
 
     "restart properly" in {
@@ -63,7 +63,7 @@ class FlowScanSpec extends StreamSpec("""
           old + current
         }
         .withAttributes(supervisionStrategy(Supervision.restartingDecider))
-      Source(List(1, 3, -1, 5, 
7)).via(scan).runWith(TestSink()).toStrict(1.second) should ===(
+      Source(List(1, 3, -1, 5, 
7)).via(scan).runWith(TestSink()).toStrict(3.seconds) should ===(
         Seq(0, 1, 4, 0, 5, 12))
     }
 
@@ -75,7 +75,8 @@ class FlowScanSpec extends StreamSpec("""
           old + current
         }
         .withAttributes(supervisionStrategy(Supervision.resumingDecider))
-      Source(List(1, 3, -1, 5, 
7)).via(scan).runWith(TestSink()).toStrict(1.second) should ===(Seq(0, 1, 4, 9, 
16))
+      Source(List(1, 3, -1, 5, 
7)).via(scan).runWith(TestSink()).toStrict(3.seconds) should ===(
+        Seq(0, 1, 4, 9, 16))
     }
 
     "scan normally for empty source" in {
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
index 1a396d823e..a6d57dee19 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
@@ -29,6 +29,7 @@ import pekko.stream.testkit.Utils.TE
 import pekko.stream.testkit.scaladsl.TestSink
 import pekko.stream.testkit.scaladsl.TestSource
 import pekko.testkit.EventFilter
+import pekko.testkit.TestDuration
 
 import org.scalatest.time.{ Seconds, Span }
 
@@ -345,7 +346,7 @@ class HubSpec extends StreamSpec {
       val broadcast = Source(1 to 10).runWith(BroadcastHub.sink[Int](1, 256))
       val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet
 
-      Await.result(resultOne, 1.second) should be(1 to 10) // fails
+      Await.result(resultOne, 3.seconds.dilated) should be(1 to 10)
     }
 
     "broadcast all elements to all consumers" in {
@@ -361,8 +362,8 @@ class HubSpec extends StreamSpec {
       }
       queue.complete() // only now is the source emptied
 
-      Await.result(resultOne, 1.second) should be(1 to 10)
-      Await.result(resultTwo, 1.second) should be(1 to 10)
+      Await.result(resultOne, 3.seconds.dilated) should be(1 to 10)
+      Await.result(resultTwo, 3.seconds.dilated) should be(1 to 10)
     }
 
     "broadcast all elements to all consumers with hot upstream" in {
@@ -370,8 +371,8 @@ class HubSpec extends StreamSpec {
       val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet
       val resultTwo = broadcast.runWith(Sink.seq)
 
-      Await.result(resultOne, 1.second) should be(1 to 10)
-      Await.result(resultTwo, 1.second) should be(1 to 10)
+      Await.result(resultOne, 3.seconds.dilated) should be(1 to 10)
+      Await.result(resultTwo, 3.seconds.dilated) should be(1 to 10)
     }
 
     "broadcast all elements to all consumers with hot upstream even some 
subscriber unsubscribe" in {
@@ -383,8 +384,8 @@ class HubSpec extends StreamSpec {
       val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet
       val resultTwo = broadcast.runWith(Sink.seq) // nothing happening yet
 
-      Await.result(resultOne, 1.second) should be(1 to 10)
-      Await.result(resultTwo, 1.second) should be(1 to 10)
+      Await.result(resultOne, 3.seconds.dilated) should be(1 to 10)
+      Await.result(resultTwo, 3.seconds.dilated) should be(1 to 10)
     }
 
     "send the same prefix to consumers attaching around the same time if one 
cancels earlier" in {
@@ -478,11 +479,11 @@ class HubSpec extends StreamSpec {
       source.runWith(Sink.fromSubscriber(downstream))
 
       downstream.request(1)
-      val first = downstream.expectNext()
+      val first = downstream.expectNext(10.seconds.dilated)
 
       for (i <- (first + 1) to (first + 10)) {
         downstream.request(1)
-        downstream.expectNext(i)
+        downstream.expectNext(10.seconds.dilated) should ===(i)
       }
 
       downstream.cancel()
@@ -554,7 +555,7 @@ class HubSpec extends StreamSpec {
       Thread.sleep(10)
 
       a[TE] shouldBe thrownBy {
-        Await.result(source.runWith(Sink.seq), 3.seconds)
+        Await.result(source.runWith(Sink.seq), 3.seconds.dilated)
       }
     }
 
@@ -884,7 +885,7 @@ class HubSpec extends StreamSpec {
       Thread.sleep(10)
 
       a[TE] shouldBe thrownBy {
-        Await.result(source.runWith(Sink.seq), 3.seconds)
+        Await.result(source.runWith(Sink.seq), 3.seconds.dilated)
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to