This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new d5ee8a63e3 feat: Add close method on ActorSystem (#2486)
d5ee8a63e3 is described below
commit d5ee8a63e35e257853103685913fb7d766d53a4d
Author: Matthew de Detrich <[email protected]>
AuthorDate: Sat Nov 15 18:34:02 2025 +0100
feat: Add close method on ActorSystem (#2486)
---
.../testkit/typed/internal/ActorSystemStub.scala | 4 +++
.../org/apache/pekko/actor/ActorSystemTest.java | 11 ++++++++
.../org/apache/pekko/actor/ActorSystemSpec.scala | 17 +++++++++++-
.../pekko/dispatch/DispatcherShutdownSpec.scala | 2 +-
.../apache/pekko/actor/typed/ActorSystemTest.java | 12 ++++++++
.../apache/pekko/actor/typed/ActorSystemSpec.scala | 15 ++++++++++
.../actorsystem-close.excludes | 20 ++++++++++++++
.../org/apache/pekko/actor/typed/ActorSystem.scala | 24 ++++++++++++++--
.../internal/adapter/ActorSystemAdapter.scala | 2 ++
.../actorsystem-close.excludes | 21 ++++++++++++++
actor/src/main/resources/reference.conf | 6 ++++
.../scala/org/apache/pekko/actor/ActorSystem.scala | 32 ++++++++++++++++++++--
.../org/apache/pekko/actor/ActorBenchmark.scala | 9 ++----
.../pekko/actor/ActorCreationBenchmark.scala | 9 ++----
.../pekko/actor/ForkJoinActorBenchmark.scala | 8 ++----
.../pekko/actor/RouterPoolCreationBenchmark.scala | 7 ++---
.../org/apache/pekko/actor/ScheduleBenchmark.scala | 6 ++--
.../pekko/actor/StashCreationBenchmark.scala | 9 ++----
.../org/apache/pekko/actor/TellOnlyBenchmark.scala | 7 ++---
.../pekko/actor/typed/TypedActorBenchmark.scala | 6 ++--
.../actor/typed/TypedForkJoinActorBenchmark.scala | 6 ++--
.../typed/delivery/ReliableDeliveryBenchmark.scala | 6 ++--
.../persistence/LevelDbBatchingBenchmark.scala | 5 +---
.../PersistenceActorDeferBenchmark.scala | 6 +---
.../persistence/PersistentActorBenchmark.scala | 6 +---
...tentActorWithAtLeastOnceDeliveryBenchmark.scala | 4 +--
.../sharding/PersistentShardingMigrationSpec.scala | 4 +--
...ememberEntitiesShardIdExtractorChangeSpec.scala | 5 +---
.../cluster/client/ClusterClientStopSpec.scala | 3 +-
.../receptionist/ClusterReceptionistSpec.scala | 6 ++--
.../pekko/cluster/ddata/DurableDataSpec.scala | 2 +-
.../pekko/cluster/ddata/DurablePruningSpec.scala | 3 +-
docs/src/test/scala/docs/io/ReadBackPressure.scala | 2 +-
.../persistence/query/PersistenceQuerySpec.scala | 5 +---
.../persistence/serialization/SerializerSpec.scala | 8 ++----
.../persistence/EndToEndEventAdapterSpec.scala | 4 +--
.../remote/artery/SystemMessageDeliverySpec.scala | 2 +-
37 files changed, 198 insertions(+), 106 deletions(-)
diff --git
a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/ActorSystemStub.scala
b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/ActorSystemStub.scala
index 325cfb89c4..b1b77a7c04 100644
---
a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/ActorSystemStub.scala
+++
b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/ActorSystemStub.scala
@@ -113,6 +113,10 @@ import com.typesafe.config.{ Config, ConfigFactory }
override def terminate(): Unit = terminationPromise.trySuccess(Done)
override def whenTerminated: Future[Done] = terminationPromise.future
override def getWhenTerminated: CompletionStage[Done] = whenTerminated.asJava
+ override def close(): Unit = {
+ terminate()
+ Await.result(whenTerminated, scala.concurrent.duration.Duration.Inf)
+ }
override val startTime: Long = System.currentTimeMillis()
override def uptime: Long = System.currentTimeMillis() - startTime
override def threadFactory: java.util.concurrent.ThreadFactory = new
ThreadFactory {
diff --git
a/actor-tests/src/test/java/org/apache/pekko/actor/ActorSystemTest.java
b/actor-tests/src/test/java/org/apache/pekko/actor/ActorSystemTest.java
index 1aff57e5c5..15f421d1f4 100644
--- a/actor-tests/src/test/java/org/apache/pekko/actor/ActorSystemTest.java
+++ b/actor-tests/src/test/java/org/apache/pekko/actor/ActorSystemTest.java
@@ -15,6 +15,7 @@ package org.apache.pekko.actor;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.concurrent.CompletionStage;
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
@@ -47,4 +48,14 @@ public class ActorSystemTest extends JUnitSuite {
public void testGetWhenTerminatedWithoutTermination() {
assertFalse(system.getWhenTerminated().toCompletableFuture().isDone());
}
+
+ @Test
+ public void testTryWithResources() throws Exception {
+ ActorSystem system = null;
+ try (ActorSystem actorSystem = ActorSystem.create()) {
+ system = actorSystem;
+ }
+ final CompletionStage<Terminated> cs = system.getWhenTerminated();
+ assertTrue(cs.toCompletableFuture().isDone());
+ }
}
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala
index 022c600eba..d791552dc5 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala
@@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.nowarn
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
+import scala.util.Using
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks
@@ -263,7 +264,7 @@ class ActorSystemSpec extends
PekkoSpec(ActorSystemSpec.config) with ImplicitSen
"throw RejectedExecutionException when shutdown" in {
val system2 = ActorSystem("RejectedExecution-1", PekkoSpec.testConf)
- Await.ready(system2.terminate(), 10.seconds)
+ system2.close()
intercept[RejectedExecutionException] {
system2.registerOnTermination { println("IF YOU SEE THIS THEN THERE'S
A BUG HERE") }
@@ -334,6 +335,20 @@ class ActorSystemSpec extends
PekkoSpec(ActorSystemSpec.config) with ImplicitSen
}
}
+ "close method terminates ActorSystem" in {
+ val system = ActorSystem()
+ system.close()
+ system.whenTerminated.isCompleted should ===(true)
+ }
+
+ "Scala's Using automatically terminates ActorSystem" in {
+ var currentSystem: ActorSystem = null
+ Using(ActorSystem()) { system =>
+ currentSystem = system
+ }
+ currentSystem.whenTerminated.isCompleted should ===(true)
+ }
+
"allow configuration of guardian supervisor strategy" in {
implicit val system =
ActorSystem(
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala
index 2e00f0195b..a05f3cd5a1 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala
@@ -45,7 +45,7 @@ class DispatcherShutdownSpec extends AnyWordSpec with
Matchers {
val system = ActorSystem("DispatcherShutdownSpec")
threadCount should be > 0
- Await.ready(system.terminate(), 1.second)
+ system.close()
Await.ready(Future(pekko.Done)(system.dispatcher), 1.second)
TestKit.awaitCond(threadCount == 0, 3.second)
diff --git
a/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/ActorSystemTest.java
b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/ActorSystemTest.java
index 8460885881..99dbd65ad5 100644
---
a/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/ActorSystemTest.java
+++
b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/ActorSystemTest.java
@@ -15,6 +15,7 @@ package org.apache.pekko.actor.typed;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.concurrent.CompletionStage;
import org.apache.pekko.Done;
@@ -39,4 +40,15 @@ public class ActorSystemTest extends JUnitSuite {
ActorSystem.create(Behaviors.empty(),
"GetWhenTerminatedWithoutTermination");
assertFalse(system.getWhenTerminated().toCompletableFuture().isDone());
}
+
+ @Test
+ public void testTryWithResources() throws Exception {
+ ActorSystem<Void> system = null;
+ try (ActorSystem<Void> actorSystem =
+ ActorSystem.create(Behaviors.empty(), "TryWithResourcesSystem")) {
+ system = actorSystem;
+ }
+ final CompletionStage<Done> cs = system.getWhenTerminated();
+ assertTrue(cs.toCompletableFuture().isDone());
+ }
}
diff --git
a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/ActorSystemSpec.scala
b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/ActorSystemSpec.scala
index 2cab64f2d8..2fb49c5ffb 100644
---
a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/ActorSystemSpec.scala
+++
b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/ActorSystemSpec.scala
@@ -18,6 +18,7 @@
package org.apache.pekko.actor.typed
import scala.annotation.nowarn
+import scala.util.Using
import org.apache.pekko
import pekko.actor.typed.scaladsl.Behaviors
@@ -45,5 +46,19 @@ class ActorSystemSpec extends PekkoSpec {
system.terminate()
}
}
+
+ "close method terminates ActorSystem" in {
+ val system = ActorSystem(Behaviors.empty[String],
"close-terminates-system")
+ system.close()
+ system.whenTerminated.isCompleted should ===(true)
+ }
+
+ "Scala's Using automatically terminates ActorSystem" in {
+ var currentSystem: ActorSystem[Nothing] = null
+ Using(ActorSystem(Behaviors.empty[String], "using-terminates-system")) {
system =>
+ currentSystem = system
+ }
+ currentSystem.whenTerminated.isCompleted should ===(true)
+ }
}
}
diff --git
a/actor-typed/src/main/mima-filters/1.3.x.backwards.excludes/actorsystem-close.excludes
b/actor-typed/src/main/mima-filters/1.3.x.backwards.excludes/actorsystem-close.excludes
new file mode 100644
index 0000000000..c41998e1bd
--- /dev/null
+++
b/actor-typed/src/main/mima-filters/1.3.x.backwards.excludes/actorsystem-close.excludes
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Add close to ActorSystem
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.actor.typed.ActorSystem.close")
+ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.typed.ActorSystem.close")
diff --git
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala
index 4610e563c0..d8a3b74913 100644
--- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala
+++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala
@@ -13,7 +13,7 @@
package org.apache.pekko.actor.typed
-import java.util.concurrent.{ CompletionStage, ThreadFactory }
+import java.util.concurrent.{ CompletionStage, ThreadFactory, TimeoutException
}
import scala.concurrent.{ ExecutionContextExecutor, Future }
@@ -42,7 +42,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
* Not for user extension.
*/
@DoNotInherit
-abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with
ClassicActorSystemProvider {
+abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with
ClassicActorSystemProvider with AutoCloseable {
this: InternalRecipientRef[T] =>
/**
@@ -147,6 +147,26 @@ abstract class ActorSystem[-T] extends ActorRef[T] with
Extensions with ClassicA
*/
def getWhenTerminated: CompletionStage[Done]
+ /**
+ * Terminates this actor system by running
[[pekko.actor.CoordinatedShutdown]] with reason
+ * [[pekko.actor.CoordinatedShutdown.ActorSystemTerminateReason]]. This
method will block
+ * until either the actor system is terminated or
+ * `pekko.coordinated-shutdown.close-actor-system-timeout` timeout duration
is
+ * passed, in which case a [[TimeoutException]] is thrown.
+ *
+ * If `pekko.coordinated-shutdown.run-by-actor-system-terminate` is
configured to `off`
+ * it will not run `CoordinatedShutdown`, but the `ActorSystem` and its
actors
+ * will still be terminated.
+ *
+ * This will stop the guardian actor, which in turn
+ * will recursively stop all its child actors, and finally the system
guardian
+ * (below which the logging actors reside) and then execute all registered
+ * termination handlers (see
[[pekko.actor.ActorSystem.registerOnTermination]]).
+ * @since 1.3.0
+ */
+ @throws(classOf[TimeoutException])
+ override def close(): Unit
+
/**
* The deadLetter address is a destination that will accept (and discard)
* every message sent to it.
diff --git
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala
index 8a1c8965aa..2cf322c878 100644
---
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala
+++
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala
@@ -122,6 +122,8 @@ import org.slf4j.{ Logger, LoggerFactory }
override lazy val getWhenTerminated: CompletionStage[pekko.Done] =
whenTerminated.asJava
+ override def close(): Unit = system.close()
+
override def systemActorOf[U](behavior: Behavior[U], name: String, props:
Props): ActorRef[U] = {
val ref = system.systemActorOf(
PropsAdapter(
diff --git
a/actor/src/main/mima-filters/1.3.x.backwards.excludes/actorsystem-close.excludes
b/actor/src/main/mima-filters/1.3.x.backwards.excludes/actorsystem-close.excludes
new file mode 100644
index 0000000000..1721d250ac
--- /dev/null
+++
b/actor/src/main/mima-filters/1.3.x.backwards.excludes/actorsystem-close.excludes
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Add close to ActorSystem
+ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.ActorSystem.close")
+ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.ExtendedActorSystem.close")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.actor.ActorSystem.close")
diff --git a/actor/src/main/resources/reference.conf
b/actor/src/main/resources/reference.conf
index 9b12fb106b..aa435557ae 100644
--- a/actor/src/main/resources/reference.conf
+++ b/actor/src/main/resources/reference.conf
@@ -1244,6 +1244,12 @@ pekko {
# Terminate the ActorSystem in the last phase actor-system-terminate.
terminate-actor-system = on
+ # The timeout that will be used when calling .close on an ActorSystem.
+ # This timeout will also be used when ActorSystem's are automatically
+ # terminated by using Java's try-with-resources or Scala's
+ # scala.util.Using
+ close-actor-system-timeout = 60 s
+
# Exit the JVM (System.exit(0)) in the last phase actor-system-terminate
# if this is set to 'on'. It is done after termination of the
# ActorSystem if terminate-actor-system=on, otherwise it is done
diff --git a/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala
b/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala
index d114981732..dd4cdfb885 100644
--- a/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala
+++ b/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable
-import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future,
Promise }
+import scala.concurrent.{ Await, ExecutionContext, ExecutionContextExecutor,
Future, Promise }
import scala.concurrent.blocking
import scala.concurrent.duration.Duration
import scala.jdk.CollectionConverters._
@@ -524,7 +524,7 @@ object ActorSystem {
* extending [[pekko.actor.ExtendedActorSystem]] instead, but beware that you
* are completely on your own in that case!
*/
-abstract class ActorSystem extends ActorRefFactory with
ClassicActorSystemProvider {
+abstract class ActorSystem extends ActorRefFactory with
ClassicActorSystemProvider with AutoCloseable {
import ActorSystem._
/**
@@ -677,6 +677,26 @@ abstract class ActorSystem extends ActorRefFactory with
ClassicActorSystemProvid
*/
def terminate(): Future[Terminated]
+ /**
+ * Terminates this actor system by running [[CoordinatedShutdown]] with
reason
+ * [[CoordinatedShutdown.ActorSystemTerminateReason]]. This method will block
+ * until either the actor system is terminated or
+ * `pekko.coordinated-shutdown.close-actor-system-timeout` timeout duration
is
+ * passed, in which case a [[TimeoutException]] is thrown.
+ *
+ * If `pekko.coordinated-shutdown.run-by-actor-system-terminate` is
configured to `off`
+ * it will not run `CoordinatedShutdown`, but the `ActorSystem` and its
actors
+ * will still be terminated.
+ *
+ * This will stop the guardian actor, which in turn
+ * will recursively stop all its child actors, and finally the system
guardian
+ * (below which the logging actors reside) and then execute all registered
+ * termination handlers (see [[ActorSystem#registerOnTermination]]).
+ * @since 1.3.0
+ */
+ @throws(classOf[TimeoutException])
+ override def close(): Unit
+
/**
* Returns a Future which will be completed after the ActorSystem has been
terminated
* and termination hooks have been executed. If you registered any callback
with
@@ -1080,6 +1100,14 @@ private[pekko] class ActorSystemImpl(
whenTerminated
}
+ override def close(): Unit = {
+ terminate()
+ val duration =
+
Duration(settings.config.getDuration("pekko.coordinated-shutdown.close-actor-system-timeout").toMillis,
+ TimeUnit.MILLISECONDS)
+ Await.result(whenTerminated, duration)
+ }
+
override private[pekko] def finalTerminate(): Unit = {
terminating = true
// these actions are idempotent
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorBenchmark.scala
index 8d3e9a74d5..360b450433 100644
--- a/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorBenchmark.scala
+++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorBenchmark.scala
@@ -15,9 +15,6 @@ package org.apache.pekko.actor
import java.util.concurrent.TimeUnit
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
import BenchmarkActors._
import org.openjdk.jmh.annotations._
@@ -100,10 +97,8 @@ class ActorBenchmark {
}
@TearDown(Level.Trial)
- def shutdown(): Unit = {
- system.terminate()
- Await.ready(system.whenTerminated, 15.seconds)
- }
+ def shutdown(): Unit =
+ system.close()
@Benchmark
@OperationsPerInvocation(totalMessages)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorCreationBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorCreationBenchmark.scala
index 05317241e9..ddefd5f775 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorCreationBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorCreationBenchmark.scala
@@ -15,9 +15,6 @@ package org.apache.pekko.actor
import java.util.concurrent.TimeUnit
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
import org.openjdk.jmh.annotations._
/*
@@ -46,10 +43,8 @@ class ActorCreationBenchmark {
}
@TearDown(Level.Trial)
- def shutdown(): Unit = {
- system.terminate()
- Await.ready(system.whenTerminated, 15.seconds)
- }
+ def shutdown(): Unit =
+ system.close()
@Benchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/actor/ForkJoinActorBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/actor/ForkJoinActorBenchmark.scala
index 1d0d398c05..04e846406a 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/actor/ForkJoinActorBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/actor/ForkJoinActorBenchmark.scala
@@ -16,8 +16,6 @@ package org.apache.pekko.actor
import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
-import scala.concurrent.Await
-import scala.concurrent.duration._
import BenchmarkActors._
import org.openjdk.jmh.annotations._
@@ -78,10 +76,8 @@ class ForkJoinActorBenchmark {
}
@TearDown(Level.Trial)
- def shutdown(): Unit = {
- system.terminate()
- Await.ready(system.whenTerminated, 15.seconds)
- }
+ def shutdown(): Unit =
+ system.close()
// @Benchmark
// @OperationsPerInvocation(totalMessagesTwoActors)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/actor/RouterPoolCreationBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/actor/RouterPoolCreationBenchmark.scala
index 097f333b1c..bec9bb0a1b 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/actor/RouterPoolCreationBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/actor/RouterPoolCreationBenchmark.scala
@@ -15,7 +15,6 @@ package org.apache.pekko.actor
import java.util.concurrent.TimeUnit
-import scala.concurrent.Await
import scala.concurrent.duration._
import org.openjdk.jmh.annotations._
@@ -40,10 +39,8 @@ class RouterPoolCreationBenchmark {
var size = 0
@TearDown(Level.Trial)
- def shutdown(): Unit = {
- system.terminate()
- Await.ready(system.whenTerminated, 15.seconds)
- }
+ def shutdown(): Unit =
+ system.close()
@Benchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/actor/ScheduleBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/actor/ScheduleBenchmark.scala
index 3cdba68193..97ce680f5a 100644
--- a/bench-jmh/src/main/scala/org/apache/pekko/actor/ScheduleBenchmark.scala
+++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/ScheduleBenchmark.scala
@@ -52,10 +52,8 @@ class ScheduleBenchmark {
}
@TearDown
- def shutdown(): Unit = {
- system.terminate()
- Await.ready(system.whenTerminated, 15.seconds)
- }
+ def shutdown(): Unit =
+ system.close()
def op(idx: Int) = if (idx == winner) promise.trySuccess(idx) else idx
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/actor/StashCreationBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/actor/StashCreationBenchmark.scala
index ee477beb04..09d5b4c733 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/actor/StashCreationBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/actor/StashCreationBenchmark.scala
@@ -15,9 +15,6 @@ package org.apache.pekko.actor
import java.util.concurrent.TimeUnit
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
import org.openjdk.jmh.annotations._
import org.apache.pekko.testkit.TestProbe
@@ -49,10 +46,8 @@ class StashCreationBenchmark {
val probe = TestProbe()
@TearDown(Level.Trial)
- def shutdown(): Unit = {
- system.terminate()
- Await.ready(system.whenTerminated, 15.seconds)
- }
+ def shutdown(): Unit =
+ system.close()
@Benchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/actor/TellOnlyBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/actor/TellOnlyBenchmark.scala
index 722fcbf8de..9933c0b26c 100644
--- a/bench-jmh/src/main/scala/org/apache/pekko/actor/TellOnlyBenchmark.scala
+++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/TellOnlyBenchmark.scala
@@ -15,7 +15,6 @@ package org.apache.pekko.actor
import java.util.concurrent.TimeUnit
-import scala.concurrent.Await
import scala.concurrent.duration._
import org.openjdk.jmh.annotations._
@@ -65,10 +64,8 @@ class TellOnlyBenchmark {
}
@TearDown(Level.Trial)
- def shutdown(): Unit = {
- system.terminate()
- Await.ready(system.whenTerminated, 15.seconds)
- }
+ def shutdown(): Unit =
+ system.close()
var actor: ActorRef = _
var probe: TestProbe = _
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedActorBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedActorBenchmark.scala
index 1760a013e5..d09e70c44f 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedActorBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedActorBenchmark.scala
@@ -100,10 +100,8 @@ class TypedActorBenchmark {
}
@TearDown(Level.Trial)
- def shutdown(): Unit = {
- system.terminate()
- Await.ready(system.whenTerminated, 15.seconds)
- }
+ def shutdown(): Unit =
+ system.close()
@Benchmark
@OperationsPerInvocation(totalMessages)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedForkJoinActorBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedForkJoinActorBenchmark.scala
index b986a355ea..8ac10da4c3 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedForkJoinActorBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedForkJoinActorBenchmark.scala
@@ -117,10 +117,8 @@ class TypedForkJoinActorBenchmark {
}
@TearDown(Level.Trial)
- def shutdown(): Unit = {
- system.terminate()
- Await.ready(system.whenTerminated, 15.seconds)
- }
+ def shutdown(): Unit =
+ system.close()
}
object TypedForkJoinActorBenchmark {
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/delivery/ReliableDeliveryBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/delivery/ReliableDeliveryBenchmark.scala
index d8704548d7..654bbaf6b4 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/delivery/ReliableDeliveryBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/delivery/ReliableDeliveryBenchmark.scala
@@ -237,10 +237,8 @@ class ReliableDeliveryBenchmark {
}
@TearDown(Level.Trial)
- def shutdown(): Unit = {
- system.terminate()
- Await.ready(system.whenTerminated, 15.seconds)
- }
+ def shutdown(): Unit =
+ system.close()
@Benchmark
@OperationsPerInvocation(messagesPerOperation)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/persistence/LevelDbBatchingBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/persistence/LevelDbBatchingBenchmark.scala
index e129d0f5a1..d133f43ed9 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/persistence/LevelDbBatchingBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/persistence/LevelDbBatchingBenchmark.scala
@@ -17,8 +17,6 @@ import java.io.File
import java.util.concurrent.TimeUnit
import scala.annotation.nowarn
-import scala.concurrent.Await
-import scala.concurrent.duration._
import org.apache.commons.io.FileUtils
import org.openjdk.jmh.annotations._
@@ -75,8 +73,7 @@ class LevelDbBatchingBenchmark {
store ! PoisonPill
Thread.sleep(500)
- sys.terminate()
- Await.ready(sys.whenTerminated, 10.seconds)
+ sys.close()
}
@Benchmark
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistenceActorDeferBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistenceActorDeferBenchmark.scala
index 31ee2d4fc5..73282ae8cd 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistenceActorDeferBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistenceActorDeferBenchmark.scala
@@ -15,9 +15,6 @@ package org.apache.pekko.persistence
import java.io.File
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
import org.apache.commons.io.FileUtils
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.annotations.Scope
@@ -71,8 +68,7 @@ class PersistentActorDeferBenchmark {
@TearDown
def shutdown(): Unit = {
- system.terminate()
- Await.ready(system.whenTerminated, 15.seconds)
+ system.close()
storageLocations.foreach(FileUtils.deleteDirectory)
}
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorBenchmark.scala
index b69696fc05..1dbea3f4e6 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorBenchmark.scala
@@ -15,9 +15,6 @@ package org.apache.pekko.persistence
import java.io.File
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
import org.apache.commons.io.FileUtils
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.annotations.Scope
@@ -69,8 +66,7 @@ class PersistentActorThroughputBenchmark {
@TearDown
def shutdown(): Unit = {
- system.terminate()
- Await.ready(system.whenTerminated, 15.seconds)
+ system.close()
storageLocations.foreach(FileUtils.deleteDirectory)
}
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala
index 4a8860c538..bc67effab1 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala
@@ -15,7 +15,6 @@ package org.apache.pekko.persistence
import java.io.File
-import scala.concurrent.Await
import scala.concurrent.duration._
import org.apache.commons.io.FileUtils
@@ -72,8 +71,7 @@ class PersistentActorWithAtLeastOnceDeliveryBenchmark {
@TearDown
def shutdown(): Unit = {
- system.terminate()
- Await.ready(system.whenTerminated, 15.seconds)
+ system.close()
storageLocations.foreach(FileUtils.deleteDirectory)
}
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/PersistentShardingMigrationSpec.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/PersistentShardingMigrationSpec.scala
index abe10c610a..49edf1ccbf 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/PersistentShardingMigrationSpec.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/PersistentShardingMigrationSpec.scala
@@ -12,9 +12,9 @@
*/
package org.apache.pekko.cluster.sharding
+
import java.util.UUID
-import scala.concurrent.Await
import scala.concurrent.duration._
import org.apache.pekko
@@ -184,7 +184,7 @@ class PersistentShardingMigrationSpec extends
PekkoSpec(PersistentShardingMigrat
extractShardId(rememberedEntitiesProbe.ref))
f(system, region, rememberedEntitiesProbe)
} finally {
- Await.ready(system.terminate(), 20.seconds)
+ system.close()
}
}
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala
index f73a0e1a4a..2419ae053d 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala
@@ -15,9 +15,6 @@ package org.apache.pekko.cluster.sharding
import java.util.UUID
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
import org.apache.pekko
import pekko.actor.ActorRef
import pekko.actor.ActorSystem
@@ -145,7 +142,7 @@ class RememberEntitiesShardIdExtractorChangeSpec
val region = ClusterSharding(system).start(TypeName, Props(new PA()),
extractEntityId, extractShardId)
f(system, region)
} finally {
- Await.ready(system.terminate(), 20.seconds)
+ system.close()
}
}
diff --git
a/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala
b/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala
index ccba3d8763..548a8e797e 100644
---
a/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala
+++
b/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala
@@ -14,7 +14,6 @@
package org.apache.pekko.cluster.client
import scala.annotation.nowarn
-import scala.concurrent.Await
import scala.concurrent.duration._
import org.apache.pekko
@@ -117,7 +116,7 @@ class ClusterClientStopSpec extends
MultiNodeSpec(ClusterClientStopSpec) with ST
runOn(first, second) {
enterBarrier("was-in-contact")
- Await.ready(system.terminate(), 10.seconds)
+ system.close()
}
diff --git
a/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala
b/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala
index 15408b56ee..88ce7f1a0e 100644
---
a/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala
+++
b/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala
@@ -205,8 +205,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with
Matchers with LogCapturin
if (down) {
// abrupt termination
- system2.terminate()
- Await.ready(system2.whenTerminated, 10.seconds)
+ system2.close()
clusterNode1.manager ! Down(clusterNode2.selfMember.address)
} else {
clusterNode1.manager ! Leave(clusterNode2.selfMember.address)
@@ -368,8 +367,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with
Matchers with LogCapturin
// abrupt termination but then a node with the same host:port comes
online quickly
system1.log.debug("Terminating system2: [{}]",
clusterNode2.selfMember.uniqueAddress)
- system2.terminate()
- Await.ready(system2.whenTerminated, 10.seconds)
+ system2.close()
val testKit3 = ActorTestKit(
system1.name,
diff --git
a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurableDataSpec.scala
b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurableDataSpec.scala
index a06e7a22db..e051f785ed 100644
---
a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurableDataSpec.scala
+++
b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurableDataSpec.scala
@@ -290,7 +290,7 @@ abstract class DurableDataSpec(multiNodeConfig:
DurableDataSpecConfig)
expectTerminated(r)
}
} finally {
- Await.ready(sys1.terminate(), 10.seconds)
+ sys1.close()
}
val sys2 = ActorSystem(
diff --git
a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurablePruningSpec.scala
b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurablePruningSpec.scala
index 7e66be8142..8a3da0184e 100644
---
a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurablePruningSpec.scala
+++
b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurablePruningSpec.scala
@@ -13,7 +13,6 @@
package org.apache.pekko.cluster.ddata
-import scala.concurrent.Await
import scala.concurrent.duration._
import org.apache.pekko
@@ -147,7 +146,7 @@ class DurablePruningSpec extends
MultiNodeSpec(DurablePruningSpec) with STMultiN
}
enterBarrier("removed")
runOn(first) {
- Await.ready(sys2.terminate(), 5.seconds)
+ sys2.close()
}
within(15.seconds) {
diff --git a/docs/src/test/scala/docs/io/ReadBackPressure.scala
b/docs/src/test/scala/docs/io/ReadBackPressure.scala
index cab26fb9ac..9ce9fae5b9 100644
--- a/docs/src/test/scala/docs/io/ReadBackPressure.scala
+++ b/docs/src/test/scala/docs/io/ReadBackPressure.scala
@@ -90,6 +90,6 @@ class PullReadingSpec extends PekkoSpec with ImplicitSender {
client.send(connection, ResumeReading)
client.expectMsg(Received(ByteString("hello")))
- Await.ready(system.terminate(), Duration.Inf)
+ system.close()
}
}
diff --git
a/persistence-query/src/test/scala/org/apache/pekko/persistence/query/PersistenceQuerySpec.scala
b/persistence-query/src/test/scala/org/apache/pekko/persistence/query/PersistenceQuerySpec.scala
index 44e3323ef2..6e04f54236 100644
---
a/persistence-query/src/test/scala/org/apache/pekko/persistence/query/PersistenceQuerySpec.scala
+++
b/persistence-query/src/test/scala/org/apache/pekko/persistence/query/PersistenceQuerySpec.scala
@@ -15,9 +15,6 @@ package org.apache.pekko.persistence.query
import java.util.concurrent.atomic.AtomicInteger
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.persistence.journal.{ EventSeq, ReadEventAdapter }
@@ -102,7 +99,7 @@ class PersistenceQuerySpec extends AnyWordSpecLike with
Matchers with BeforeAndA
val sys = ActorSystem(s"sys-${systemCounter.incrementAndGet()}", config)
try block(sys)
- finally Await.ready(sys.terminate(), 10.seconds)
+ finally sys.close()
}
}
diff --git
a/persistence-shared/src/test/scala/org/apache/pekko/persistence/serialization/SerializerSpec.scala
b/persistence-shared/src/test/scala/org/apache/pekko/persistence/serialization/SerializerSpec.scala
index ae49b5dbfb..834fd124be 100644
---
a/persistence-shared/src/test/scala/org/apache/pekko/persistence/serialization/SerializerSpec.scala
+++
b/persistence-shared/src/test/scala/org/apache/pekko/persistence/serialization/SerializerSpec.scala
@@ -16,9 +16,6 @@ package org.apache.pekko.persistence.serialization
import java.io.NotSerializableException
import java.util.UUID
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
import org.apache.commons.codec.binary.Hex.{ decodeHex, encodeHex }
import org.apache.pekko
@@ -348,9 +345,8 @@ class MessageSerializerRemotingSpec extends
PekkoSpec(remote.withFallback(custom
remoteSystem.actorOf(Props[RemoteActor](), "remote")
}
- override def afterTermination(): Unit = {
- Await.ready(remoteSystem.terminate(), Duration.Inf)
- }
+ override def afterTermination(): Unit =
+ remoteSystem.close()
"A message serializer" must {
"custom-serialize PersistentRepr messages during remoting" in {
diff --git
a/persistence/src/test/scala/org/apache/pekko/persistence/EndToEndEventAdapterSpec.scala
b/persistence/src/test/scala/org/apache/pekko/persistence/EndToEndEventAdapterSpec.scala
index 9620edc3ad..81f169d107 100644
---
a/persistence/src/test/scala/org/apache/pekko/persistence/EndToEndEventAdapterSpec.scala
+++
b/persistence/src/test/scala/org/apache/pekko/persistence/EndToEndEventAdapterSpec.scala
@@ -16,8 +16,6 @@ package org.apache.pekko.persistence
import java.io.File
import scala.annotation.nowarn
-import scala.concurrent.Await
-import scala.concurrent.duration._
import org.apache.commons.io.FileUtils
@@ -187,7 +185,7 @@ class EndToEndEventAdapterSpec extends AnyWordSpecLike with
Matchers with Before
def withActorSystem[T](name: String, config: Config)(block: ActorSystem =>
T): T = {
val system = ActorSystem(name, journalConfig.withFallback(config))
try block(system)
- finally Await.ready(system.terminate(), 3.seconds)
+ finally system.close()
}
"EventAdapters in end-to-end scenarios" must {
diff --git
a/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala
b/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala
index f13ea78e8f..73dff9928d 100644
---
a/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala
+++
b/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala
@@ -184,7 +184,7 @@ class SystemMessageDeliverySpec extends
AbstractSystemMessageDeliverySpec(System
watch(remoteRef)
remoteRef ! "hello"
expectMsg("hello")
- Await.ready(systemC.terminate(), 10.seconds)
+ systemC.close()
system.log.debug("systemC terminated")
// DeathWatchNotification is sent from systemC, failure detection
takes longer than 3 seconds
expectTerminated(remoteRef, 10.seconds)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]