Repository: flink
Updated Branches:
  refs/heads/master 15cdc5cc7 -> 4922ced71


[hotfix] Make TaskManagerRunner shutdown asynchronously


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4922ced7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4922ced7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4922ced7

Branch: refs/heads/master
Commit: 4922ced71a307a26b9f5070b41f72fd5d93b0ac8
Parents: c832f52
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu May 17 19:48:08 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu May 17 19:49:44 2018 +0200

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskManagerRunner.java | 90 ++++++++++++++------
 .../taskexecutor/TaskManagerRunnerTest.java     | 80 +++++++++++++++++
 2 files changed, 142 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4922ced7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 08335b2..2a775bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -47,6 +48,7 @@ import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 
@@ -56,6 +58,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -69,10 +73,12 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * It constructs the related components (network, I/O manager, memory manager, 
RPC service, HA service)
  * and starts them.
  */
-public class TaskManagerRunner implements FatalErrorHandler {
+public class TaskManagerRunner implements FatalErrorHandler, 
AutoCloseableAsync {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerRunner.class);
 
+       private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L;
+
        private static final int STARTUP_FAILURE_RETURN_CODE = 1;
 
        private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
@@ -98,6 +104,10 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
 
        private final TaskExecutor taskManager;
 
+       private final CompletableFuture<Void> terminationFuture;
+
+       private boolean shutdown;
+
        public TaskManagerRunner(Configuration configuration, ResourceID 
resourceId) throws Exception {
                this.configuration = checkNotNull(configuration);
                this.resourceId = checkNotNull(resourceId);
@@ -137,6 +147,9 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
                        blobCacheService,
                        false,
                        this);
+
+               this.terminationFuture = new CompletableFuture<>();
+               this.shutdown = false;
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -147,19 +160,37 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
                taskManager.start();
        }
 
-       public void shutDown() throws Exception {
-               shutDownInternally();
-       }
-
-       protected void shutDownInternally() throws Exception {
-               Exception exception = null;
-
+       @Override
+       public CompletableFuture<Void> closeAsync() {
                synchronized (lock) {
-                       try {
+                       if (!shutdown) {
+                               shutdown = true;
+
                                taskManager.shutDown();
-                       } catch (Exception e) {
-                               exception = e;
+                               final CompletableFuture<Void> 
taskManagerTerminationFuture = taskManager.getTerminationFuture();
+
+                               final CompletableFuture<Void> 
serviceTerminationFuture = FutureUtils.composeAfterwards(
+                                       taskManagerTerminationFuture,
+                                       this::shutDownServices);
+
+                               serviceTerminationFuture.whenComplete(
+                                       (Void ignored, Throwable throwable) -> {
+                                               if (throwable != null) {
+                                                       
terminationFuture.completeExceptionally(throwable);
+                                               } else {
+                                                       
terminationFuture.complete(null);
+                                               }
+                                       });
                        }
+               }
+
+               return terminationFuture;
+       }
+
+       private CompletableFuture<Void> shutDownServices() {
+               synchronized (lock) {
+                       Collection<CompletableFuture<Void>> terminationFutures 
= new ArrayList<>(3);
+                       Exception exception = null;
 
                        try {
                                blobCacheService.close();
@@ -174,32 +205,26 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
                        }
 
                        try {
-                               rpcService.stopService().get();
-                       } catch (InterruptedException ie) {
-                               exception = 
ExceptionUtils.firstOrSuppressed(ie, exception);
-
-                               Thread.currentThread().interrupt();
-                       } catch (Exception e) {
-                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-                       }
-
-                       try {
                                highAvailabilityServices.close();
                        } catch (Exception e) {
                                exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                        }
 
-                       
ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, 
executor);
+                       terminationFutures.add(rpcService.stopService());
+
+                       
terminationFutures.add(ExecutorUtils.nonBlockingShutdown(timeout.toMilliseconds(),
 TimeUnit.MILLISECONDS, executor));
 
                        if (exception != null) {
-                               throw exception;
+                               
terminationFutures.add(FutureUtils.completedExceptionally(exception));
                        }
+
+                       return FutureUtils.completeAll(terminationFutures);
                }
        }
 
        // export the termination future for caller to know it is terminated
        public CompletableFuture<Void> getTerminationFuture() {
-               return taskManager.getTerminationFuture();
+               return terminationFuture;
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -210,12 +235,21 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
        public void onFatalError(Throwable exception) {
                LOG.error("Fatal error occurred while executing the 
TaskManager. Shutting it down...", exception);
 
-               try {
-                       shutDown();
-               } catch (Throwable t) {
-                       LOG.error("Could not properly shut down TaskManager.", 
t);
+               if (ExceptionUtils.isJvmFatalOrOutOfMemoryError(exception)) {
+                       terminateJVM();
+               } else {
+                       closeAsync();
+
+                       FutureUtils.orTimeout(terminationFuture, 
FATAL_ERROR_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+                       terminationFuture.whenComplete(
+                               (Void ignored, Throwable throwable) -> {
+                                       terminateJVM();
+                               });
                }
+       }
 
+       protected void terminateJVM() {
                System.exit(RUNTIME_FAILURE_RETURN_CODE);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4922ced7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
new file mode 100644
index 0000000..f2f748d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.net.ServerSocket;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link TaskManagerRunner}.
+ */
+public class TaskManagerRunnerTest extends TestLogger {
+
+       @Test
+       public void testTaskManagerRunnerShutdown() throws Exception {
+               final Configuration configuration = new Configuration();
+               final ResourceID taskManagerResourceId = ResourceID.generate();
+
+               final ServerSocket localhost = new ServerSocket(0);
+
+               configuration.setString(JobManagerOptions.ADDRESS, 
localhost.getInetAddress().getHostName());
+               configuration.setInteger(JobManagerOptions.PORT, 
localhost.getLocalPort());
+               
configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms");
+               final CompletableFuture<Void> jvmTerminationFuture = new 
CompletableFuture<>();
+               final TestingTaskManagerRunner taskManagerRunner = new 
TestingTaskManagerRunner(configuration, taskManagerResourceId, 
jvmTerminationFuture);
+
+               taskManagerRunner.start();
+
+               try {
+                       // wait until we trigger the jvm termination
+                       jvmTerminationFuture.get();
+
+                       
assertThat(taskManagerRunner.getTerminationFuture().isDone(), is(true));
+               } finally {
+                       localhost.close();
+                       taskManagerRunner.close();
+               }
+       }
+
+       private static class TestingTaskManagerRunner extends TaskManagerRunner 
{
+
+               private final CompletableFuture<Void> jvmTerminationFuture;
+
+               public TestingTaskManagerRunner(Configuration configuration, 
ResourceID resourceId, CompletableFuture<Void> jvmTerminationFuture) throws 
Exception {
+                       super(configuration, resourceId);
+                       this.jvmTerminationFuture = jvmTerminationFuture;
+               }
+
+               @Override
+               protected void terminateJVM() {
+                       jvmTerminationFuture.complete(null);
+               }
+       }
+}

Reply via email to