This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch fix-jdk25-virtual-thread-stream-shutdown in repository https://gitbox.apache.org/repos/asf/pekko.git
commit fa51c7b5bac2869709b92b4986fd0af7b341a736 Author: He-Pin <[email protected]> AuthorDate: Fri May 8 22:06:06 2026 +0800 Stabilize JDK 25 virtualized stream tests --- .../org/apache/pekko/cluster/ClusterTestKit.scala | 3 ++- .../pekko/cluster/MixedProtocolClusterSpec.scala | 2 ++ .../scaladsl/FlowMapAsyncUnorderedSpec.scala | 3 ++- .../stream/scaladsl/FlowMapWithResourceSpec.scala | 19 ++++++++++++------ .../pekko/stream/scaladsl/FlowScanSpec.scala | 9 +++++---- .../org/apache/pekko/stream/scaladsl/HubSpec.scala | 23 +++++++++++----------- 6 files changed, 36 insertions(+), 23 deletions(-) diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/ClusterTestKit.scala b/cluster/src/test/scala/org/apache/pekko/cluster/ClusterTestKit.scala index b5903cf314..b3aafde706 100644 --- a/cluster/src/test/scala/org/apache/pekko/cluster/ClusterTestKit.scala +++ b/cluster/src/test/scala/org/apache/pekko/cluster/ClusterTestKit.scala @@ -109,7 +109,8 @@ trait ClusterTestKit extends TestKitBase { actorSystems.contains(actorSystem) /** Shuts down all registered [[ActorSystem]]s */ - def shutdownAll(): Unit = actorSystems.foreach(sys => shutdown(sys, 10.seconds, verifySystemShutdown = true)) + // The timeout is dilated by TestKit; keep a larger base for virtualized JDK 25 nightly runs. + def shutdownAll(): Unit = actorSystems.foreach(sys => shutdown(sys, 30.seconds, verifySystemShutdown = true)) /** * Force the passed [[ActorSystem]] to quit the cluster and shutdown. diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala index fc74fc7dbd..cf7d6b8dd6 100644 --- a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala +++ b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala @@ -37,6 +37,8 @@ object MixedProtocolClusterSpec { pekko.remote.accept-protocol-names = ["pekko", "akka"] pekko.remote.enforce-strict-config-prefix-check-on-join = on + pekko.coordinated-shutdown.phases.actor-system-terminate.timeout = 30 s + pekko.cluster.downing-provider-class = "org.apache.pekko.cluster.sbr.SplitBrainResolverProvider" pekko.cluster.split-brain-resolver.active-strategy = keep-majority pekko.cluster.jmx.multi-mbeans-in-same-jvm = on""") diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index ef6c987dcd..d98f5fd79e 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -28,6 +28,7 @@ import pekko.stream.ActorAttributes.supervisionStrategy import pekko.stream.Supervision.resumingDecider import pekko.stream.testkit._ import pekko.stream.testkit.scaladsl._ +import pekko.testkit.TestDuration import pekko.testkit.TestLatch import pekko.testkit.TestProbe @@ -361,7 +362,7 @@ class FlowMapAsyncUnorderedSpec extends StreamSpec { Source(1 to N) .mapAsyncUnordered(parallelism)(_ => deferred()) .runFold(0)((c, _) => c + 1) - .futureValue(Timeout(3.seconds)) should ===(N) + .futureValue(Timeout(3.seconds.dilated)) should ===(N) } finally { timer.interrupt() } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala index caf6dcdac4..a3c2f9b400 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala @@ -41,7 +41,7 @@ import pekko.stream.impl.StreamSupervisor.Children import pekko.stream.testkit.{ StreamSpec, TestSubscriber } import pekko.stream.testkit.Utils.{ assertDispatcher, TE, UnboundedMailboxConfig } import pekko.stream.testkit.scaladsl.{ TestSink, TestSource } -import pekko.testkit.EventFilter +import pekko.testkit.{ EventFilter, TestDuration } import pekko.util.ByteString class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) { @@ -414,21 +414,24 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) { @nowarn("msg=deprecated") val mat = ActorMaterializer() val promise = Promise[Done]() + val created = Promise[Done]() val matVal = Source .single(1) .mapWithResource(() => { + created.trySuccess(Done) newBufferedReader() })((reader, count) => readLines(reader, count), reader => { reader.close() - promise.complete(Success(Done)) + promise.tryComplete(Success(Done)) Some(List("End")) }) .mapConcat(identity) .runWith(Sink.never)(mat) + Await.result(created.future, 3.seconds.dilated) shouldBe Done mat.shutdown() matVal.failed.futureValue shouldBe an[AbruptTerminationException] - Await.result(promise.future, 3.seconds) shouldBe Done + Await.result(promise.future, 3.seconds.dilated) shouldBe Done } "will close the autocloseable resource when upstream complete" in { @@ -517,21 +520,25 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) { @nowarn("msg=deprecated") val mat = ActorMaterializer() val promise = Promise[Done]() - val create = () => + val created = Promise[Done]() + val create = () => { + created.trySuccess(Done) new AutoCloseable { override def close(): Unit = { closedCounter.incrementAndGet() - promise.complete(Success(Done)) + promise.tryComplete(Success(Done)) } } + } val matVal = Source .single(1) .mapWithResource(create, (_: AutoCloseable, count) => count) .runWith(Sink.never)(mat) + Await.result(created.future, 3.seconds.dilated) shouldBe Done closedCounter.get shouldBe 0 mat.shutdown() matVal.failed.futureValue shouldBe an[AbruptTerminationException] - Await.result(promise.future, 3.seconds) shouldBe Done + Await.result(promise.future, 3.seconds.dilated) shouldBe Done closedCounter.get shouldBe 1 } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowScanSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowScanSpec.scala index e4821ddab0..02f82347cb 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowScanSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowScanSpec.scala @@ -33,7 +33,7 @@ class FlowScanSpec extends StreamSpec(""" "A Scan" must { - def scan(s: Source[Int, NotUsed], duration: Duration = 5.seconds): immutable.Seq[Int] = + def scan(s: Source[Int, NotUsed], duration: Duration = remainingOrDefault): immutable.Seq[Int] = Await.result(s.scan(0)(_ + _).runFold(immutable.Seq.empty[Int])(_ :+ _), duration) "Scan" in { @@ -52,7 +52,7 @@ class FlowScanSpec extends StreamSpec(""" "emit values promptly" in { val f = Source.single(1).concat(Source.maybe[Int]).scan(0)(_ + _).take(2).runWith(Sink.seq) - Await.result(f, 1.second) should ===(Seq(0, 1)) + Await.result(f, remainingOrDefault) should ===(Seq(0, 1)) } "restart properly" in { @@ -63,7 +63,7 @@ class FlowScanSpec extends StreamSpec(""" old + current } .withAttributes(supervisionStrategy(Supervision.restartingDecider)) - Source(List(1, 3, -1, 5, 7)).via(scan).runWith(TestSink()).toStrict(1.second) should ===( + Source(List(1, 3, -1, 5, 7)).via(scan).runWith(TestSink()).toStrict(3.seconds) should ===( Seq(0, 1, 4, 0, 5, 12)) } @@ -75,7 +75,8 @@ class FlowScanSpec extends StreamSpec(""" old + current } .withAttributes(supervisionStrategy(Supervision.resumingDecider)) - Source(List(1, 3, -1, 5, 7)).via(scan).runWith(TestSink()).toStrict(1.second) should ===(Seq(0, 1, 4, 9, 16)) + Source(List(1, 3, -1, 5, 7)).via(scan).runWith(TestSink()).toStrict(3.seconds) should ===( + Seq(0, 1, 4, 9, 16)) } "scan normally for empty source" in { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala index 1a396d823e..a6d57dee19 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala @@ -29,6 +29,7 @@ import pekko.stream.testkit.Utils.TE import pekko.stream.testkit.scaladsl.TestSink import pekko.stream.testkit.scaladsl.TestSource import pekko.testkit.EventFilter +import pekko.testkit.TestDuration import org.scalatest.time.{ Seconds, Span } @@ -345,7 +346,7 @@ class HubSpec extends StreamSpec { val broadcast = Source(1 to 10).runWith(BroadcastHub.sink[Int](1, 256)) val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet - Await.result(resultOne, 1.second) should be(1 to 10) // fails + Await.result(resultOne, 3.seconds.dilated) should be(1 to 10) } "broadcast all elements to all consumers" in { @@ -361,8 +362,8 @@ class HubSpec extends StreamSpec { } queue.complete() // only now is the source emptied - Await.result(resultOne, 1.second) should be(1 to 10) - Await.result(resultTwo, 1.second) should be(1 to 10) + Await.result(resultOne, 3.seconds.dilated) should be(1 to 10) + Await.result(resultTwo, 3.seconds.dilated) should be(1 to 10) } "broadcast all elements to all consumers with hot upstream" in { @@ -370,8 +371,8 @@ class HubSpec extends StreamSpec { val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet val resultTwo = broadcast.runWith(Sink.seq) - Await.result(resultOne, 1.second) should be(1 to 10) - Await.result(resultTwo, 1.second) should be(1 to 10) + Await.result(resultOne, 3.seconds.dilated) should be(1 to 10) + Await.result(resultTwo, 3.seconds.dilated) should be(1 to 10) } "broadcast all elements to all consumers with hot upstream even some subscriber unsubscribe" in { @@ -383,8 +384,8 @@ class HubSpec extends StreamSpec { val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet val resultTwo = broadcast.runWith(Sink.seq) // nothing happening yet - Await.result(resultOne, 1.second) should be(1 to 10) - Await.result(resultTwo, 1.second) should be(1 to 10) + Await.result(resultOne, 3.seconds.dilated) should be(1 to 10) + Await.result(resultTwo, 3.seconds.dilated) should be(1 to 10) } "send the same prefix to consumers attaching around the same time if one cancels earlier" in { @@ -478,11 +479,11 @@ class HubSpec extends StreamSpec { source.runWith(Sink.fromSubscriber(downstream)) downstream.request(1) - val first = downstream.expectNext() + val first = downstream.expectNext(10.seconds.dilated) for (i <- (first + 1) to (first + 10)) { downstream.request(1) - downstream.expectNext(i) + downstream.expectNext(10.seconds.dilated) should ===(i) } downstream.cancel() @@ -554,7 +555,7 @@ class HubSpec extends StreamSpec { Thread.sleep(10) a[TE] shouldBe thrownBy { - Await.result(source.runWith(Sink.seq), 3.seconds) + Await.result(source.runWith(Sink.seq), 3.seconds.dilated) } } @@ -884,7 +885,7 @@ class HubSpec extends StreamSpec { Thread.sleep(10) a[TE] shouldBe thrownBy { - Await.result(source.runWith(Sink.seq), 3.seconds) + Await.result(source.runWith(Sink.seq), 3.seconds.dilated) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
