asfgit closed pull request #6446: [FLINK-9240] Avoid deprecated Akka methods
URL: https://github.com/apache/flink/pull/6446
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 373267ffc34..309010e4bf0 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -90,6 +90,7 @@
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -249,8 +250,8 @@ public boolean isLoaded() {
                @Override
                public void close() throws Exception {
                        if (isLoaded()) {
-                               actorSystem.shutdown();
-                               actorSystem.awaitTermination();
+                               actorSystem.terminate();
+                               Await.ready(actorSystem.whenTerminated(), 
Duration.Inf());
                                actorSystem = null;
                        }
                }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index ca8b6fbabf7..ec9cfc548c1 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -67,6 +67,9 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
+
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
  */
@@ -114,8 +117,8 @@ public void setUp() throws Exception {
        public void shutDownActorSystem() {
                if (jobManagerSystem != null) {
                        try {
-                               jobManagerSystem.shutdown();
-                               jobManagerSystem.awaitTermination();
+                               jobManagerSystem.terminate();
+                               Await.ready(jobManagerSystem.whenTerminated(), 
Duration.Inf());
                        } catch (Exception e) {
                                e.printStackTrace();
                                fail(e.getMessage());
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 630fa839000..1e0670efc5f 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -57,6 +57,8 @@
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.dispatch.OnComplete;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
@@ -73,10 +75,14 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
+import static 
org.apache.flink.runtime.concurrent.Executors.directExecutionContext;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -374,11 +380,14 @@ protected int runPrivileged(Configuration config, 
Configuration dynamicPropertie
                        }
 
                        if (actorSystem != null) {
-                               try {
-                                       actorSystem.shutdown();
-                               } catch (Throwable tt) {
-                                       LOG.error("Error shutting down actor 
system", tt);
-                               }
+                               actorSystem.terminate().onComplete(
+                                       new OnComplete<Terminated>() {
+                                               public void 
onComplete(Throwable failure, Terminated result) {
+                                                       if (failure != null) {
+                                                               
LOG.error("Error shutting down actor system", failure);
+                                                       }
+                                               }
+                                       }, directExecutionContext());
                        }
 
                        if (futureExecutor != null) {
@@ -412,7 +421,11 @@ protected int runPrivileged(Configuration config, 
Configuration dynamicPropertie
                LOG.info("Mesos JobManager started");
 
                // wait until everything is done
-               actorSystem.awaitTermination();
+               try {
+                       Await.ready(actorSystem.whenTerminated(), 
Duration.Inf());
+               } catch (InterruptedException | TimeoutException e) {
+                       LOG.error("Error shutting down actor system", e);
+               }
 
                // if we get here, everything work out jolly all right, and we 
even exited smoothly
                if (webMonitor != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
index 708437ff8cb..38c29bd7156 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
@@ -25,6 +25,9 @@
 import akka.actor.Address;
 import org.slf4j.Logger;
 
+import java.util.concurrent.TimeoutException;
+
+import scala.concurrent.Await;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -65,11 +68,13 @@ public void hasQuarantined(String remoteSystem, ActorSystem 
actorSystem) {
 
        private void shutdownActorSystem(ActorSystem actorSystem) {
                // shut the actor system down
-               actorSystem.shutdown();
+               actorSystem.terminate();
 
                try {
                        // give it some time to complete the shutdown
-                       actorSystem.awaitTermination(timeout);
+                       Await.ready(actorSystem.whenTerminated(), timeout);
+               } catch (InterruptedException | TimeoutException e) {
+                       log.error("Exception thrown when terminating the actor 
system", e);
                } finally {
                        // now let's crash the JVM
                        System.exit(exitCode);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
index 0b0cbf5cb01..808de222d0a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
@@ -43,11 +43,13 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -154,8 +156,12 @@ public Configuration getConfiguration() {
                        exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }
 
-               actorSystem.shutdown();
-               actorSystem.awaitTermination();
+               actorSystem.terminate();
+               try {
+                       Await.ready(actorSystem.whenTerminated(), 
Duration.Inf());
+               } catch (InterruptedException | TimeoutException e) {
+                       exception = e;
+               }
 
                try {
                        highAvailabilityServices.closeAndCleanupAllData();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
index 8bb357648bf..b166e1066ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
@@ -106,7 +106,7 @@ public void shutdown() {
        @Override
        public void run() {
                try {
-                       while (running && (monitored == null || 
!monitored.isTerminated())) {
+                       while (running && (monitored == null || 
!monitored.whenTerminated().isCompleted())) {
                                
logger.info(getMemoryUsageStatsAsString(memoryBean));
                                
logger.info(getDirectMemoryStatsAsString(directBufferBean));
                                
logger.info(getMemoryPoolStatsAsString(poolBeans));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java
index e19cfd70a92..db0a1dcaf2b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java
@@ -20,6 +20,7 @@
 
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
+import scala.concurrent.Await;
 import scala.concurrent.duration.Duration;
 
 import java.util.concurrent.TimeoutException;
@@ -67,7 +68,7 @@ public ProcessShutDownThread(
        @Override
        public void run() {
                try {
-                       actorSystem.awaitTermination(terminationTimeout);
+                       Await.ready(actorSystem.whenTerminated(), 
terminationTimeout);
                } catch (Exception e) {
                        if (e instanceof TimeoutException) {
                                log.error("Actor system shut down timed out.", 
e);
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2a8f49267d9..cf831e32340 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.clusterframework.messages._
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.clusterframework.{BootstrapTools, 
FlinkResourceManager}
+import org.apache.flink.runtime.concurrent.Executors.directExecutionContext
 import org.apache.flink.runtime.concurrent.{FutureUtils, 
ScheduledExecutorServiceAdapter}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
@@ -1867,7 +1868,10 @@ class JobManager(
       FiniteDuration(10, SECONDS)).start()
 
     // Shutdown and discard all queued messages
-    context.system.shutdown()
+    context.system.terminate().onComplete {
+      case scala.util.Success(_) =>
+      case scala.util.Failure(t) => log.warn("Could not cleanly shut down 
actor system", t)
+    }(directExecutionContext())
   }
 
   private def instantiateMetrics(jobManagerMetricGroup: MetricGroup) : Unit = {
@@ -2046,7 +2050,7 @@ object JobManager {
     }
 
     // block until everything is shut down
-    jobManagerSystem.awaitTermination()
+    Await.ready(jobManagerSystem.whenTerminated, Duration.Inf)
 
     webMonitorOption.foreach{
       webMonitor =>
@@ -2288,11 +2292,10 @@ object JobManager {
     catch {
       case t: Throwable =>
         LOG.error("Error while starting up JobManager", t)
-        try {
-          jobManagerSystem.shutdown()
-        } catch {
-          case tt: Throwable => LOG.warn("Could not cleanly shut down actor 
system", tt)
-        }
+        jobManagerSystem.terminate().onComplete {
+          case scala.util.Success(_) =>
+          case scala.util.Failure(tt) => LOG.warn("Could not cleanly shut down 
actor system", tt)
+        }(directExecutionContext())
         throw t
     }
   }
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 8d9e1ee1e07..4f70a22f9c0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.concurrent._
+import scala.util.{Failure, Success}
 
 /**
  * Abstract base class for Flink's mini cluster. The mini cluster starts a
@@ -479,30 +480,30 @@ abstract class FlinkMiniCluster(
 
     if (!useSingleActorSystem) {
       taskManagerActorSystems foreach {
-        _ foreach(_.shutdown())
+        _ foreach(_.terminate())
       }
 
       resourceManagerActorSystems foreach {
-        _ foreach(_.shutdown())
+        _ foreach(_.terminate())
       }
     }
 
     jobManagerActorSystems foreach {
-      _ foreach(_.shutdown())
+      _ foreach(_.terminate())
     }
   }
 
   def awaitTermination(): Unit = {
     jobManagerActorSystems foreach {
-      _ foreach(_.awaitTermination())
+      _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf))
     }
 
     resourceManagerActorSystems foreach {
-      _ foreach(_.awaitTermination())
+      _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf))
     }
 
     taskManagerActorSystems foreach {
-      _ foreach(_.awaitTermination())
+      _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf))
     }
   }
 
@@ -625,7 +626,10 @@ abstract class FlinkMiniCluster(
 
   def shutdownJobClientActorSystem(actorSystem: ActorSystem): Unit = {
     if(!useSingleActorSystem) {
-      actorSystem.shutdown()
+      actorSystem.terminate().onComplete {
+        case Success(_) =>
+        case Failure(t) => LOG.warn("Could not cleanly shut down the job 
client actor system.", t)
+      }
     }
   }
 
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 9a057ab62b2..8db4c160d04 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -77,6 +77,7 @@ import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
+import scala.util.{Failure, Success}
 
 /**
  * The TaskManager is responsible for executing the individual tasks of a 
Flink job. It is
@@ -423,7 +424,7 @@ class TaskManager(
               futureResponse.mapTo[Boolean].onComplete {
                 // IMPORTANT: In the future callback, we cannot directly 
modify state
                 //            but only send messages to the TaskManager to do 
those changes
-                case scala.util.Success(result) =>
+                case Success(result) =>
                   if (!result) {
                   self ! decorateMessage(
                     FailTask(
@@ -432,7 +433,7 @@ class TaskManager(
                     )
                   }
 
-                case scala.util.Failure(t) =>
+                case Failure(t) =>
                 self ! decorateMessage(
                   FailTask(
                     executionID,
@@ -839,10 +840,10 @@ class TaskManager(
             blobCache.get.getTransientBlobService.putTransient(fis)
           }(context.dispatcher)
             .onComplete {
-              case scala.util.Success(value) =>
+              case Success(value) =>
                 sender ! value
                 fis.close()
-              case scala.util.Failure(e) =>
+              case Failure(e) =>
                 sender ! akka.actor.Status.Failure(e)
                 fis.close()
             }(context.dispatcher)
@@ -1534,7 +1535,7 @@ class TaskManager(
   }
 
   protected def shutdown(): Unit = {
-    context.system.shutdown()
+    context.system.terminate()
 
     // Await actor system termination and shut down JVM
     new ProcessShutDownThread(
@@ -1897,15 +1898,14 @@ object TaskManager {
       }
 
       // block until everything is done
-      taskManagerSystem.awaitTermination()
+      Await.ready(taskManagerSystem.whenTerminated, Duration.Inf)
     } catch {
       case t: Throwable =>
         LOG.error("Error while starting up taskManager", t)
-        try {
-          taskManagerSystem.shutdown()
-        } catch {
-          case tt: Throwable => LOG.warn("Could not cleanly shut down actor 
system", tt)
-        }
+        taskManagerSystem.terminate().onComplete {
+          case Success(_) =>
+          case Failure(tt) => LOG.warn("Could not cleanly shut down actor 
system", tt)
+        }(Executors.directExecutionContext())
         throw t
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
index 37a45477cc3..a998fb0ac8c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -43,7 +43,10 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -72,10 +75,10 @@ public static void setup() {
        }
 
        @AfterClass
-       public static void tearDown() {
+       public static void tearDown() throws InterruptedException, 
TimeoutException {
                if (actorSystem1 != null) {
-                       actorSystem1.shutdown();
-                       actorSystem1.awaitTermination();
+                       actorSystem1.terminate();
+                       Await.ready(actorSystem1.whenTerminated(), 
Duration.Inf());
                }
        }
 
@@ -85,10 +88,10 @@ public void setupTest() {
        }
 
        @After
-       public void tearDownTest() {
+       public void tearDownTest() throws InterruptedException, 
TimeoutException {
                if (actorSystem2 != null) {
-                       actorSystem2.shutdown();
-                       actorSystem2.awaitTermination();
+                       actorSystem2.terminate();
+                       Await.ready(actorSystem2.whenTerminated(), 
Duration.Inf());
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
index 0b7547df33a..ae8542bf9e7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
@@ -321,11 +321,11 @@ public void testClientNonDetachedListeningBehaviour() 
throws Exception {
                        }
 
                        if (taskManagerSystem != null) {
-                               taskManagerSystem.shutdown();
+                               taskManagerSystem.terminate();
                        }
 
                        if (testSystem != null) {
-                               testSystem.shutdown();
+                               testSystem.terminate();
                        }
 
                        highAvailabilityServices.closeAndCleanupAllData();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index 38b843146ed..fc164830904 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -182,7 +182,7 @@ public void testReapProcessOnFailure() {
                                jmProcess.destroy();
                        }
                        if (localSystem != null) {
-                               localSystem.shutdown();
+                               localSystem.terminate();
                        }
                }
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 873a4f1f4f4..052349cd043 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -132,6 +132,7 @@
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
 
@@ -940,7 +941,7 @@ public void testCancelWithSavepoint() throws Exception {
                        assertTrue(savepointFile.exists());
                } finally {
                        if (actorSystem != null) {
-                               actorSystem.shutdown();
+                               actorSystem.terminate();
                        }
 
                        if (archiver != null) {
@@ -956,7 +957,7 @@ public void testCancelWithSavepoint() throws Exception {
                        }
 
                        if (actorSystem != null) {
-                               actorSystem.awaitTermination(TESTING_TIMEOUT());
+                               Await.result(actorSystem.whenTerminated(), 
TESTING_TIMEOUT());
                        }
                }
        }
@@ -1130,7 +1131,7 @@ public void 
testCancelWithSavepointNoDirectoriesConfigured() throws Exception {
                        }
                } finally {
                        if (actorSystem != null) {
-                               actorSystem.shutdown();
+                               actorSystem.terminate();
                        }
 
                        if (archiver != null) {
@@ -1243,7 +1244,7 @@ public void 
testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception
                        assertEquals(1, targetDirectory.listFiles().length);
                } finally {
                        if (actorSystem != null) {
-                               actorSystem.shutdown();
+                               actorSystem.terminate();
                        }
 
                        if (archiver != null) {
@@ -1259,7 +1260,7 @@ public void 
testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception
                        }
 
                        if (actorSystem != null) {
-                               
actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
+                               Await.result(actorSystem.whenTerminated(), 
TestingUtils.TESTING_TIMEOUT());
                        }
                }
        }
@@ -1416,7 +1417,7 @@ public void testSavepointRestoreSettings() throws 
Exception {
                        assertTrue("Unexpected response: " + response, response 
instanceof JobSubmitSuccess);
                } finally {
                        if (actorSystem != null) {
-                               actorSystem.shutdown();
+                               actorSystem.terminate();
                        }
 
                        if (archiver != null) {
@@ -1432,7 +1433,7 @@ public void testSavepointRestoreSettings() throws 
Exception {
                        }
 
                        if (actorSystem != null) {
-                               
actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
+                               Await.ready(actorSystem.whenTerminated(), 
TestingUtils.TESTING_TIMEOUT());
                        }
                }
        }
@@ -1516,8 +1517,8 @@ public void testResourceManagerConnection() throws 
TimeoutException, Interrupted
 
                } finally {
                        // cleanup the actor system and with it all of the 
started actors if not already terminated
-                       actorSystem.shutdown();
-                       actorSystem.awaitTermination();
+                       actorSystem.terminate();
+                       Await.ready(actorSystem.whenTerminated(), 
Duration.Inf());
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index c7d837b00ed..ef493b9330a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -117,7 +117,7 @@ public static void setupJobManager() {
        @AfterClass
        public static void teardownJobmanager() throws Exception {
                if (jobManagerSystem != null) {
-                       jobManagerSystem.shutdown();
+                       jobManagerSystem.terminate();
                }
 
                if (highAvailabilityServices != null) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index db040232f69..3abde88e574 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -49,6 +49,8 @@
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -154,11 +156,11 @@ protected void run() {
                        Assert.assertFalse(metricRegistry.isShutdown());
 
                        // shut down the actors and the actor system
-                       actorSystem.shutdown();
-                       actorSystem.awaitTermination();
+                       actorSystem.terminate();
+                       Await.result(actorSystem.whenTerminated(), 
Duration.Inf());
                } finally {
                        if (actorSystem != null) {
-                               actorSystem.shutdown();
+                               actorSystem.terminate();
                        }
 
                        if (highAvailabilityServices != null) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 1acaf6181bb..3767421b7d6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -116,7 +116,7 @@ public long getCount() {
                testActor.message = null;
                assertEquals(0, emptyDump.serializedMetrics.length);
 
-               s.shutdown();
+               s.terminate();
        }
 
        private static class TestActor extends UntypedActor {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
index 786b0aeae2c..ed98be5d480 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
@@ -66,7 +66,7 @@ public static void setUp() throws Exception {
        @AfterClass
        public static void tearDown() throws Exception {
                if (system != null) {
-                       system.shutdown();
+                       system.terminate();
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 07d02443f1a..a32c1f64906 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -49,6 +49,8 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import scala.concurrent.Await;
+
 public class AkkaRpcActorTest extends TestLogger {
 
        // 
------------------------------------------------------------------------
@@ -259,8 +261,8 @@ public void testActorTerminationWhenServiceShutdown() 
throws Exception {
 
                        terminationFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
                } finally {
-                       rpcActorSystem.shutdown();
-                       
rpcActorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout));
+                       rpcActorSystem.terminate();
+                       Await.ready(rpcActorSystem.whenTerminated(), 
FutureUtils.toFiniteDuration(timeout));
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 828993012b8..9669513a111 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -62,6 +62,8 @@
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.junit.Assert.assertTrue;
@@ -205,8 +207,8 @@ protected void run() {
                        jobManager.tell(Kill.getInstance(), 
ActorRef.noSender());
 
                        // shut down the actors and the actor system
-                       actorSystem.shutdown();
-                       actorSystem.awaitTermination();
+                       actorSystem.terminate();
+                       Await.ready(actorSystem.whenTerminated(), 
Duration.Inf());
                        actorSystem = null;
 
                        // now that the TaskManager is shut down, the 
components should be shut down as well
@@ -215,9 +217,9 @@ protected void run() {
                        assertTrue(memManager.isShutdown());
                } finally {
                        if (actorSystem != null) {
-                               actorSystem.shutdown();
+                               actorSystem.terminate();
 
-                               
actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
+                               Await.ready(actorSystem.whenTerminated(), 
TestingUtils.TESTING_TIMEOUT());
                        }
 
                        highAvailabilityServices.closeAndCleanupAllData();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index ae66a0836ad..0b9b951f54f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -218,7 +218,7 @@ public void testReapProcessOnFailure() throws Exception {
                                taskManagerProcess.destroy();
                        }
                        if (jmActorSystem != null) {
-                               jmActorSystem.shutdown();
+                               jmActorSystem.terminate();
                        }
                        if (highAvailabilityServices != null) {
                                
highAvailabilityServices.closeAndCleanupAllData();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index ad32a4fdd8d..7ee0921646a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -102,7 +102,7 @@ public static void startActorSystem() {
        @AfterClass
        public static void shutdownActorSystem() {
                if (actorSystem != null) {
-                       actorSystem.shutdown();
+                       actorSystem.terminate();
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
index 94473b9581c..53145504ec8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
@@ -37,10 +37,13 @@
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
+import scala.concurrent.Await;
+
 /**
  * Test for the {@link AkkaJobManagerRetriever}.
  */
@@ -55,10 +58,10 @@ public static void setup() {
        }
 
        @AfterClass
-       public static void teardown() {
+       public static void teardown() throws InterruptedException, 
TimeoutException {
                if (actorSystem != null) {
-                       actorSystem.shutdown();
-                       
actorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout));
+                       actorSystem.terminate();
+                       Await.ready(actorSystem.whenTerminated(), 
FutureUtils.toFiniteDuration(timeout));
 
                        actorSystem = null;
                }
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
index 6d7d87cbf2b..947d0291db3 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
@@ -77,7 +77,7 @@ class JobManagerConnectionTest {
         fail(e.getMessage)
     }
     finally {
-      actorSystem.shutdown()
+      actorSystem.terminate()
     }
   }
 
@@ -116,7 +116,7 @@ class JobManagerConnectionTest {
         fail(e.getMessage)
     }
     finally {
-      actorSystem.shutdown()
+      actorSystem.terminate()
     }
   }
 
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index c2d47f92d80..f722935f90d 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.testingUtils.TestingMessages.Alive
 import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.testutils.TestingResourceManager
 
+import scala.concurrent.duration.Duration
 import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Await, ExecutionContext, Future}
 
@@ -229,8 +230,8 @@ class TestingCluster(
           Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
 
           if(!singleActorSystem) {
-            jmActorSystems(index).shutdown()
-            jmActorSystems(index).awaitTermination()
+            jmActorSystems(index).terminate()
+            Await.ready(jmActorSystems(index).whenTerminated, Duration.Inf)
           }
 
           val newJobManagerActorSystem = if(!singleActorSystem) {
@@ -274,8 +275,8 @@ class TestingCluster(
         Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
 
         if(!singleActorSystem) {
-          tmActorSystems(index).shutdown()
-          tmActorSystems(index).awaitTermination()
+          tmActorSystems(index).terminate()
+          Await.ready(tmActorSystems(index).whenTerminated, Duration.Inf)
         }
 
         val taskManagerActorSystem  = if(!singleActorSystem) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index ce750d3b067..a22a8a8c930 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -94,7 +94,9 @@
 import scala.Option;
 import scala.Some;
 import scala.Tuple2;
+import scala.concurrent.Await;
 import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
@@ -425,8 +427,8 @@ public Object getKey(Long value) throws Exception {
                                miniCluster.awaitTermination();
                        }
 
-                       system.shutdown();
-                       system.awaitTermination();
+                       system.terminate();
+                       Await.ready(system.whenTerminated(), Duration.Inf());
 
                        testingServer.stop();
                        testingServer.close();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index aa2f38d2619..00c6865300c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -38,8 +38,11 @@
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.TimeoutException;
 
+import scala.concurrent.Await;
 import scala.concurrent.ExecutionContext$;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.forkjoin.ForkJoinPool;
 import scala.concurrent.impl.ExecutionContextImpl;
 
@@ -62,7 +65,7 @@
        };
 
        @Test
-       public void testLocalFlinkMiniClusterWithMultipleTaskManagers() {
+       public void testLocalFlinkMiniClusterWithMultipleTaskManagers() throws 
InterruptedException, TimeoutException {
 
                final ActorSystem system = ActorSystem.create("Testkit", 
AkkaUtils.getDefaultAkkaConfig());
                LocalFlinkMiniCluster miniCluster = null;
@@ -117,7 +120,7 @@ protected void run() {
                        }
 
                        JavaTestKit.shutdownActorSystem(system);
-                       system.awaitTermination();
+                       Await.ready(system.whenTerminated(), Duration.Inf());
                }
 
                // shut down the global execution context, to make sure it does 
not affect this testing
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 497ac87c4a3..e98e174cbdb 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -57,6 +57,8 @@
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.dispatch.OnComplete;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -71,11 +73,15 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import scala.Option;
 import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
+import static 
org.apache.flink.runtime.concurrent.Executors.directExecutionContext;
 import static org.apache.flink.yarn.Utils.require;
 
 /**
@@ -401,11 +407,14 @@ protected int runApplicationMaster(Configuration config) {
                        }
 
                        if (actorSystem != null) {
-                               try {
-                                       actorSystem.shutdown();
-                               } catch (Throwable tt) {
-                                       LOG.error("Error shutting down actor 
system", tt);
-                               }
+                               actorSystem.terminate().onComplete(
+                                       new OnComplete<Terminated>() {
+                                               public void 
onComplete(Throwable failure, Terminated result) {
+                                                       if (failure != null) {
+                                                               
LOG.error("Error shutting down actor system", failure);
+                                                       }
+                                               }
+                                       }, directExecutionContext());
                        }
 
                        futureExecutor.shutdownNow();
@@ -418,7 +427,11 @@ protected int runApplicationMaster(Configuration config) {
                LOG.info("YARN Application Master started");
 
                // wait until everything is done
-               actorSystem.awaitTermination();
+               try {
+                       Await.ready(actorSystem.whenTerminated(), 
Duration.Inf());
+               } catch (InterruptedException | TimeoutException e) {
+                       LOG.error("Error shutting down actor system", e);
+               }
 
                // if we get here, everything work out jolly all right, and we 
even exited smoothly
                if (webMonitor != null) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to