TisonKun closed pull request #6701: [FLINK-10349] Unify stopActor utils
URL: https://github.com/apache/flink/pull/6701
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 2b8abb1f8ff..85952da64ea 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -22,6 +22,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClientActorTest;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -29,7 +30,6 @@
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
@@ -148,7 +148,7 @@ public void testJobManagerRetrievalWithHAServices() throws
Exception {
assertEquals(leaderId, gateway.leaderSessionID());
} finally {
if (actorRef != null) {
- TestingUtils.stopActorGracefully(actorRef);
+ ActorUtils.stopActorGracefully(actorRef);
}
actorSystem.shutdown();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
index 9a992814235..5f976e701ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
@@ -19,17 +19,19 @@
package org.apache.flink.runtime.akka;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.ActorGateway;
import akka.actor.ActorRef;
import akka.actor.Kill;
-import akka.actor.PoisonPill;
import akka.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
@@ -87,12 +89,52 @@
return FutureUtils.completeAll(terminationFutures);
}
- public static void stopActor(AkkaActorGateway akkaActorGateway) {
- stopActor(akkaActorGateway.actor());
- }
+ // ---------- Utils to stop an actor ----------
+
+ private static final FiniteDuration DEFAULT_TIMEOUT =
FiniteDuration.apply(1, TimeUnit.MINUTES);
public static void stopActor(ActorRef actorRef) {
- actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ if (actorRef != null) {
+ actorRef.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+ }
+
+ public static void stopActor(ActorGateway actorGateway) {
+ if (actorGateway != null) {
+ stopActor(actorGateway.actor());
+ }
+ }
+
+ public static void stopActorGracefully(ActorRef actorRef) {
+ stopActorsGracefully(actorRef);
+ }
+
+ public static void stopActorGracefully(ActorGateway actorGateway) {
+ stopActorGracefully(actorGateway.actor());
+ }
+
+ public static void stopActorsGracefully(@Nonnull ActorRef... actorRefs)
{
+ List<CompletableFuture<?>> futures = new
ArrayList<>(actorRefs.length);
+
+ for (ActorRef actorRef : actorRefs) {
+ if (actorRef != null) {
+
futures.add(FutureUtils.toJava(Patterns.gracefulStop(actorRef,
DEFAULT_TIMEOUT)));
+ }
+ }
+
+ FutureUtils.waitForAll(futures);
+ }
+
+ public static void stopActorsGracefully(@Nonnull ActorGateway...
actorGateways) {
+ List<ActorRef> actorRefs = new
ArrayList<>(actorGateways.length);
+
+ for (ActorGateway actorGateway : actorGateways) {
+ if (actorGateway != null) {
+ actorRefs.add(actorGateway.actor());
+ }
+ }
+
+ stopActorsGracefully(actorRefs.toArray(new ActorRef[0]));
}
private ActorUtils() {}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
index 2d39e0b972d..9f4a3b2c0a3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
@@ -25,7 +25,6 @@
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.actor.Kill;
import akka.actor.Props;
import akka.actor.RobustActorSystem;
import akka.testkit.JavaTestKit;
@@ -81,7 +80,7 @@ public void
testLeaderSessionMessageFilteringOfFlinkUntypedActor() {
assertEquals(3, underlyingActor.getMessageCounter());
} finally {
- stopActor(actor);
+ ActorUtils.stopActor(actor);
}
}
@@ -114,13 +113,7 @@ public void
testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDM
}
} finally {
- stopActor(actor);
- }
- }
-
- private static void stopActor(ActorRef actor) {
- if (actor != null) {
- actor.tell(Kill.getInstance(), ActorRef.noSender());
+ ActorUtils.stopActor(actor);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
index a998fb0ac8c..eab262caafb 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.akka;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -130,9 +129,9 @@ public void testWatcheeQuarantined() throws
ExecutionException, InterruptedExcep
Assert.assertEquals(actorSystem1Address.toString(),
quarantineFuture.get());
} finally {
- TestingUtils.stopActor(watchee);
- TestingUtils.stopActor(watcher);
- TestingUtils.stopActor(monitor);
+ ActorUtils.stopActor(watchee);
+ ActorUtils.stopActor(watcher);
+ ActorUtils.stopActor(monitor);
}
}
@@ -171,9 +170,9 @@ public void testWatcherQuarantining() throws
ExecutionException, InterruptedExce
Assert.assertEquals(actorSystem1Address.toString(),
quarantineFuture.get());
} finally {
- TestingUtils.stopActor(watchee);
- TestingUtils.stopActor(watcher);
- TestingUtils.stopActor(monitor);
+ ActorUtils.stopActor(watchee);
+ ActorUtils.stopActor(watcher);
+ ActorUtils.stopActor(monitor);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
index 388572c70ef..de395238032 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
@@ -22,6 +22,7 @@
import akka.testkit.JavaTestKit;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
@@ -42,8 +43,6 @@
import scala.Option;
-import java.util.Arrays;
-
/**
* Runs tests to ensure that a cluster is shutdown properly.
*/
@@ -129,8 +128,7 @@ protected void run() {
StopClusterSuccessful.getInstance()
);
} finally {
-
TestingUtils.stopActorGatewaysGracefully(Arrays.asList(
- jobManager, taskManager,
forwardingActor));
+ ActorUtils.stopActorsGracefully(jobManager,
taskManager, forwardingActor);
}
}};
@@ -207,8 +205,7 @@ protected void run() {
StopClusterSuccessful.getInstance()
);
} finally {
-
TestingUtils.stopActorGatewaysGracefully(Arrays.asList(
- jobManager, taskManager,
resourceManager, forwardingActor));
+ ActorUtils.stopActorsGracefully(jobManager,
taskManager, resourceManager, forwardingActor);
}
}};
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
index d52afe4b4ea..7af84a4888d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
@@ -21,6 +21,7 @@
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -42,9 +43,6 @@
import scala.Option;
-
-import java.util.Arrays;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -152,8 +150,7 @@ protected void run() {
assertEquals(1, reply.resources.size());
assertTrue(reply.resources.contains(resourceID));
} finally {
-
TestingUtils.stopActorGatewaysGracefully(Arrays.asList(
- jobManager, resourceManager,
forwardingActor));
+ ActorUtils.stopActorsGracefully(jobManager,
resourceManager, forwardingActor);
}
}};
@@ -215,8 +212,7 @@ protected void run() {
assertEquals(1, reply.resources.size());
} finally {
-
TestingUtils.stopActorGatewaysGracefully(Arrays.asList(
- jobManager, resourceManager,
taskManager, forwardingActor));
+ ActorUtils.stopActorsGracefully(jobManager,
resourceManager, taskManager, forwardingActor);
}
}};
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index d991983e6f2..757110e6ee1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -26,6 +26,7 @@
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobServer;
@@ -415,7 +416,7 @@ public void testFailingJobRecovery() throws Exception {
// verify that the JobManager terminated
testProbe.expectTerminated(jobManager, timeout);
} finally {
- TestingUtils.stopActor(jobManager);
+ ActorUtils.stopActor(jobManager);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 3b502db0d7d..cf2167a3cda 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -20,6 +20,7 @@
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -116,7 +117,7 @@ public void testLeaderElection() throws Exception {
Await.ready(leaderFuture, duration);
} finally {
- TestingUtils.stopActor(jm);
+ ActorUtils.stopActor(jm);
}
}
@@ -162,7 +163,7 @@ public void testLeaderReelection() throws Exception {
Await.ready(leader2Future, duration);
} finally {
- TestingUtils.stopActor(jm2);
+ ActorUtils.stopActor(jm2);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
index 0e837a59c17..eebd25d5d47 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
@@ -277,8 +278,8 @@ protected void run() {
}
};
} finally {
- TestingUtils.stopActor(jobManger);
- TestingUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManger);
+ ActorUtils.stopActor(taskManager);
highAvailabilityServices.closeAndCleanupAllData();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
index ccf5f60a824..5e43095db77 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
@@ -193,8 +194,8 @@ protected void run() {
}
};
} finally {
- TestingUtils.stopActor(jobManger);
- TestingUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManger);
+ ActorUtils.stopActor(taskManager);
highAvailabilityServices.closeAndCleanupAllData();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 7ee0921646a..51529543617 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -26,6 +26,7 @@
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -60,15 +61,13 @@
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
-import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActor;
import static
org.apache.flink.runtime.testingUtils.TestingUtils.createTaskManager;
-import static
org.apache.flink.runtime.testingUtils.TestingUtils.stopActorGatewaysGracefully;
-import static
org.apache.flink.runtime.testingUtils.TestingUtils.stopActorGracefully;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -189,7 +188,7 @@ public void testSimpleRegistration() throws Exception {
e.printStackTrace();
fail(e.getMessage());
} finally {
-
stopActorGatewaysGracefully(Arrays.asList(taskManager1, taskManager2,
jobManager, resourceManager));
+ ActorUtils.stopActorsGracefully(taskManager1,
taskManager2, jobManager, resourceManager);
embeddedHaServices.closeAndCleanupAllData();
}
@@ -240,7 +239,7 @@ public void testDelayedRegistration() throws Exception {
assertTrue(response instanceof
TaskManagerMessages.RegisteredAtJobManager);
} finally {
-
stopActorGatewaysGracefully(Arrays.asList(taskManager, jobManager));
+ ActorUtils.stopActorsGracefully(taskManager,
jobManager);
embeddedHaServices.closeAndCleanupAllData();
}
@@ -301,7 +300,7 @@ protected void run() {
e.printStackTrace();
fail(e.getMessage());
} finally {
- stopActorGracefully(taskManager);
+ ActorUtils.stopActorsGracefully(taskManager);
}
}};
}
@@ -371,7 +370,7 @@ protected void run() {
}
};
} finally {
-
stopActorGatewaysGracefully(Arrays.asList(taskManager, jm));
+ ActorUtils.stopActorsGracefully(taskManager,
jm);
}
}};
}
@@ -469,7 +468,7 @@ protected RegisterTaskManager match(Object msg) throws
Exception {
+
maxExpectedNumberOfRegisterTaskManagerMessages,
registerTaskManagerMessages.length <=
maxExpectedNumberOfRegisterTaskManagerMessages);
} finally {
-
stopActorGatewaysGracefully(Arrays.asList(taskManager, jm));
+ ActorUtils.stopActorsGracefully(taskManager,
jm);
}
}};
}
@@ -533,7 +532,7 @@ protected void run() {
// kill the first forwarding JobManager
watch(fakeJobManager1Gateway.actor());
- stopActor(fakeJobManager1Gateway.actor());
+
ActorUtils.stopActorsGracefully(fakeJobManager1Gateway.actor());
final ActorGateway gateway =
fakeJobManager1Gateway;
@@ -598,7 +597,7 @@ protected void run() {
e.printStackTrace();
fail(e.getMessage());
} finally {
-
stopActorGatewaysGracefully(Arrays.asList(taskManagerGateway,
fakeJobManager2Gateway));
+
ActorUtils.stopActorsGracefully(taskManagerGateway, fakeJobManager2Gateway);
}
}};
}
@@ -675,7 +674,7 @@ protected void run() {
}
};
} finally {
- stopActorGracefully(taskManagerGateway);
+
ActorUtils.stopActorsGracefully(taskManagerGateway);
}
}};
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 2da2ba49265..9d66cca6c82 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -24,6 +24,7 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.blob.PermanentBlobKey;
@@ -268,8 +269,8 @@ else if (!(message instanceof
TaskManagerMessages.Heartbeat)) {
}
finally {
// shut down the actors
- TestingUtils.stopActor(taskManager);
- TestingUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
}
}};
}
@@ -409,8 +410,8 @@ protected void run() {
fail(e.getMessage());
}
finally {
- TestingUtils.stopActor(taskManager);
- TestingUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
}
}};
}
@@ -539,8 +540,8 @@ protected void run() {
};
}
finally {
- TestingUtils.stopActor(taskManager);
- TestingUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
}
}};
}
@@ -636,8 +637,8 @@ protected void run() {
}
finally {
// shut down the actors
- TestingUtils.stopActor(taskManager);
- TestingUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
}
}};
}
@@ -775,8 +776,8 @@ protected void run() {
}
finally {
// shut down the actors
- TestingUtils.stopActor(taskManager);
- TestingUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
}
}};
}
@@ -924,8 +925,8 @@ protected void run() {
}
finally {
// shut down the actors
- TestingUtils.stopActor(taskManager);
- TestingUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
}
}};
}
@@ -1027,8 +1028,8 @@ protected void run() {
fail(e.getMessage());
}
finally {
- TestingUtils.stopActor(taskManager);
- TestingUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
}
}};
}
@@ -1148,8 +1149,8 @@ protected void run() {
fail(e.getMessage());
}
finally {
- TestingUtils.stopActor(taskManager);
- TestingUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
}
}};
}
@@ -1206,8 +1207,8 @@ protected void run() {
}
};
} finally {
- TestingUtils.stopActor(taskManager);
- TestingUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
}
}};}
@@ -1519,8 +1520,8 @@ protected void run() {
}
};
} finally {
- TestingUtils.stopActor(taskManagerActorGateway);
- TestingUtils.stopActor(jobManagerActorGateway);
+ ActorUtils.stopActor(taskManagerActorGateway);
+ ActorUtils.stopActor(jobManagerActorGateway);
}
}};
}
@@ -1618,8 +1619,8 @@ public void testFailingScheduleOrUpdateConsumersMessage()
throws Exception {
assertEquals(true, cancelFuture.get());
} finally {
- TestingUtils.stopActor(taskManager);
- TestingUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
}
}};
}
@@ -1679,8 +1680,8 @@ public void testSubmitTaskFailure() throws Exception {
// expected
}
} finally {
- TestingUtils.stopActor(jobManager);
- TestingUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
}
}
@@ -1748,8 +1749,8 @@ public void testStopTaskFailure() throws Exception {
// expected
}
} finally {
- TestingUtils.stopActor(jobManager);
- TestingUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
}
}
@@ -1795,8 +1796,8 @@ public void testStackTraceSampleFailure() throws
Exception {
// expected
}
} finally {
- TestingUtils.stopActor(jobManager);
- TestingUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
}
}
@@ -1866,8 +1867,8 @@ public void testUpdateTaskInputPartitionsFailure() throws
Exception {
// expected
}
} finally {
- TestingUtils.stopActor(jobManager);
- TestingUtils.stopActor(taskManager);
+ ActorUtils.stopActor(jobManager);
+ ActorUtils.stopActor(taskManager);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
index 35fa7b82a02..9948704eafc 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
@@ -19,12 +19,12 @@
package org.apache.flink.runtime.webmonitor.retriever.impl;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClientActorTest;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import akka.actor.ActorRef;
@@ -100,7 +100,7 @@ public void testAkkaJobManagerRetrieval() throws Exception {
settableLeaderRetrievalService.stop();
if (actorRef != null) {
- TestingUtils.stopActorGracefully(actorRef);
+ ActorUtils.stopActorGracefully(actorRef);
}
}
}
diff --git
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/FlinkActorTest.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/FlinkActorTest.scala
index acd48469c66..f4df1056b02 100644
---
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/FlinkActorTest.scala
+++
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/FlinkActorTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.akka
import java.util.UUID
-import akka.actor.{Props, Kill, ActorRef, ActorSystem}
+import akka.actor.{Props, ActorSystem}
import akka.testkit.{TestActorRef, TestKit}
import grizzled.slf4j.Logger
import
org.apache.flink.runtime.akka.FlinkUntypedActorTest.PlainRequiresLeaderSessionID
@@ -82,11 +82,6 @@ class FlinkActorTest(_system: ActorSystem)
s"leader session ID, even though the message requires a leader
session ID.")
}
}
-
- def stopActor(actor: ActorRef): Unit = {
- actor ! Kill
- }
-
}
class PlainFlinkActor(val leaderSessionID: Option[UUID])
diff --git
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 887c4f5d377..54108153eda 100644
---
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{Executors,
ScheduledExecutorService, TimeUnit}
import akka.actor._
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.akka.{ActorUtils, AkkaUtils}
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.highavailability.HighAvailabilityServices
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.instance._
import
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor
import
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
import
org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration,
AlreadyRegistered, RegisterTaskManager}
-import org.apache.flink.runtime.metrics.{MetricRegistryImpl,
MetricRegistryConfiguration}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration,
MetricRegistryImpl}
import org.apache.flink.runtime.taskmanager.TaskManagerLocation
import
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenLeader
import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingUtils}
@@ -150,7 +150,7 @@ ImplicitSender with WordSpecLike with Matchers with
BeforeAndAfterAll with Befor
val response = probe.expectMsgType[LeaderSessionMessage]
response match {
- case LeaderSessionMessage(leaderSessionID,
AcknowledgeRegistration(id, _)) => id2 = id
+ case LeaderSessionMessage(_, AcknowledgeRegistration(id, _)) =>
id2 = id
case _ => fail("Wrong response message: " + response)
}
}
@@ -159,10 +159,10 @@ ImplicitSender with WordSpecLike with Matchers with
BeforeAndAfterAll with Befor
assertNotNull(id2)
assertNotEquals(id1, id2)
} finally {
- jmOption.foreach(TestingUtils.stopActorGracefully)
- rmOption.foreach(TestingUtils.stopActorGracefully)
- tm1Option.foreach(TestingUtils.stopActorGracefully)
- tm2Option.foreach(TestingUtils.stopActorGracefully)
+ jmOption.foreach(ActorUtils.stopActorGracefully)
+ rmOption.foreach(ActorUtils.stopActorGracefully)
+ tm1Option.foreach(ActorUtils.stopActorGracefully)
+ tm2Option.foreach(ActorUtils.stopActorGracefully)
}
}
@@ -239,8 +239,8 @@ ImplicitSender with WordSpecLike with Matchers with
BeforeAndAfterAll with Befor
}
}
} finally {
- jmOption.foreach(TestingUtils.stopActorGracefully)
- rmOption.foreach(TestingUtils.stopActorGracefully)
+ jmOption.foreach(ActorUtils.stopActorGracefully)
+ rmOption.foreach(ActorUtils.stopActorGracefully)
}
}
}
@@ -277,8 +277,8 @@ ImplicitSender with WordSpecLike with Matchers with
BeforeAndAfterAll with Befor
components._6,
highAvailabilityServices.getJobManagerLeaderElectionService(
HighAvailabilityServices.DEFAULT_JOB_ID),
- highAvailabilityServices.getSubmittedJobGraphStore(),
- highAvailabilityServices.getCheckpointRecoveryFactory(),
+ highAvailabilityServices.getSubmittedJobGraphStore,
+ highAvailabilityServices.getCheckpointRecoveryFactory,
components._9,
components._10,
None)
diff --git
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 9d791a90f9d..8eae515ac99 100644
---
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -22,8 +22,8 @@ import java.util
import java.util.concurrent._
import java.util.{Collections, UUID}
-import akka.actor.{ActorRef, ActorSystem, Kill, Props}
-import akka.pattern.{Patterns, ask}
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.pattern.ask
import com.typesafe.config.ConfigFactory
import grizzled.slf4j.Logger
import org.apache.flink.api.common.time.Time
@@ -295,76 +295,6 @@ object TestingUtils {
new AkkaActorGateway(taskManager, leaderId)
}
- /** Stops the given actor by sending it a Kill message
- *
- * @param actor
- */
- def stopActor(actor: ActorRef): Unit = {
- if (actor != null) {
- actor ! Kill
- }
- }
-
- /** Stops the given actor by sending it a Kill message
- *
- * @param actorGateway
- */
- def stopActor(actorGateway: ActorGateway): Unit = {
- if (actorGateway != null) {
- stopActor(actorGateway.actor())
- }
- }
-
- def stopActorGracefully(actor: ActorRef): Unit = {
- val gracefulStopFuture = Patterns.gracefulStop(actor,
TestingUtils.TESTING_TIMEOUT)
-
- Await.result(gracefulStopFuture, TestingUtils.TESTING_TIMEOUT)
- }
-
- def stopActorGracefully(actorGateway: ActorGateway): Unit = {
- stopActorGracefully(actorGateway.actor())
- }
-
- def stopActorsGracefully(actors: ActorRef*): Unit = {
- val gracefulStopFutures = actors.flatMap{
- actor =>
- Option(actor) match {
- case Some(actorRef) => Some(Patterns.gracefulStop(actorRef,
TestingUtils.TESTING_TIMEOUT))
- case None => None
- }
- }
-
- implicit val executionContext = defaultExecutionContext
-
- val globalStopFuture =
scala.concurrent.Future.sequence(gracefulStopFutures)
-
- Await.result(globalStopFuture, TestingUtils.TESTING_TIMEOUT)
- }
-
- def stopActorsGracefully(actors: java.util.List[ActorRef]): Unit = {
- import scala.collection.JavaConverters._
-
- stopActorsGracefully(actors.asScala: _*)
- }
-
- def stopActorGatewaysGracefully(actorGateways: ActorGateway*): Unit = {
- val actors = actorGateways.flatMap {
- actorGateway =>
- Option(actorGateway) match {
- case Some(actorGateway) => Some(actorGateway.actor())
- case None => None
- }
- }
-
- stopActorsGracefully(actors: _*)
- }
-
- def stopActorGatewaysGracefully(actorGateways:
java.util.List[ActorGateway]): Unit = {
- import scala.collection.JavaConverters._
-
- stopActorGatewaysGracefully(actorGateways.asScala: _*)
- }
-
/** Creates a testing JobManager using the default recovery mode (standalone)
*
* @param actorSystem The ActorSystem to use
diff --git
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 5c9b1fb0c23..9c314d042a0 100644
---
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.runtime.jobmanager
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.configuration.{ConfigConstants, Configuration,
JobManagerOptions, TaskManagerOptions}
-import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
+import org.apache.flink.runtime.akka.{ActorUtils, AkkaUtils,
ListeningBehaviour}
import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
import org.apache.flink.runtime.messages.Acknowledge
import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -55,7 +55,7 @@ class JobManagerFailsITCase(_system: ActorSystem)
val cluster = startDeathwatchCluster(num_slots, 1)
try {
- val tm = cluster.getTaskManagers(0)
+ val tm = cluster.getTaskManagers.head
val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
// disable disconnect message to test death watch
@@ -66,7 +66,7 @@ class JobManagerFailsITCase(_system: ActorSystem)
expectMsg(1)
// stop the current leader and make sure that he is gone
- TestingUtils.stopActorGracefully(jmGateway)
+ ActorUtils.stopActorGracefully(jmGateway)
cluster.restartLeadingJobManager()
@@ -99,14 +99,13 @@ class JobManagerFailsITCase(_system: ActorSystem)
try {
var jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
- val tm = cluster.getTaskManagers(0)
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
self)
expectMsg(JobSubmitSuccess(jobGraph.getJobID))
// stop the current leader and make sure that he is gone
- TestingUtils.stopActorGracefully(jmGateway)
+ ActorUtils.stopActorGracefully(jmGateway)
cluster.restartLeadingJobManager()
@@ -119,11 +118,11 @@ class JobManagerFailsITCase(_system: ActorSystem)
jmGateway.tell(SubmitJob(jobGraph2,
ListeningBehaviour.EXECUTION_RESULT), self)
- expectMsg(JobSubmitSuccess(jobGraph2.getJobID()))
+ expectMsg(JobSubmitSuccess(jobGraph2.getJobID))
val result = expectMsgType[JobResultSuccess]
- result.result.getJobId() should equal(jobGraph2.getJobID)
+ result.result.getJobId should equal(jobGraph2.getJobID)
}
} finally {
cluster.stop()
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services