Repository: flink Updated Branches: refs/heads/master 15cdc5cc7 -> 4922ced71
[hotfix] Make TaskManagerRunner shutdown asynchronously Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4922ced7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4922ced7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4922ced7 Branch: refs/heads/master Commit: 4922ced71a307a26b9f5070b41f72fd5d93b0ac8 Parents: c832f52 Author: Till Rohrmann <[email protected]> Authored: Thu May 17 19:48:08 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu May 17 19:49:44 2018 +0200 ---------------------------------------------------------------------- .../runtime/taskexecutor/TaskManagerRunner.java | 90 ++++++++++++++------ .../taskexecutor/TaskManagerRunnerTest.java | 80 +++++++++++++++++ 2 files changed, 142 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4922ced7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 08335b2..2a775bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -27,6 +27,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -47,6 +48,7 @@ import org.apache.flink.runtime.util.Hardware; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; @@ -56,6 +58,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -69,10 +73,12 @@ import static org.apache.flink.util.Preconditions.checkState; * It constructs the related components (network, I/O manager, memory manager, RPC service, HA service) * and starts them. */ -public class TaskManagerRunner implements FatalErrorHandler { +public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class); + private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L; + private static final int STARTUP_FAILURE_RETURN_CODE = 1; private static final int RUNTIME_FAILURE_RETURN_CODE = 2; @@ -98,6 +104,10 @@ public class TaskManagerRunner implements FatalErrorHandler { private final TaskExecutor taskManager; + private final CompletableFuture<Void> terminationFuture; + + private boolean shutdown; + public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception { this.configuration = checkNotNull(configuration); this.resourceId = checkNotNull(resourceId); @@ -137,6 +147,9 @@ public class TaskManagerRunner implements FatalErrorHandler { blobCacheService, false, this); + + this.terminationFuture = new CompletableFuture<>(); + this.shutdown = false; } // -------------------------------------------------------------------------------------------- @@ -147,19 +160,37 @@ public class TaskManagerRunner implements FatalErrorHandler { taskManager.start(); } - public void shutDown() throws Exception { - shutDownInternally(); - } - - protected void shutDownInternally() throws Exception { - Exception exception = null; - + @Override + public CompletableFuture<Void> closeAsync() { synchronized (lock) { - try { + if (!shutdown) { + shutdown = true; + taskManager.shutDown(); - } catch (Exception e) { - exception = e; + final CompletableFuture<Void> taskManagerTerminationFuture = taskManager.getTerminationFuture(); + + final CompletableFuture<Void> serviceTerminationFuture = FutureUtils.composeAfterwards( + taskManagerTerminationFuture, + this::shutDownServices); + + serviceTerminationFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + terminationFuture.completeExceptionally(throwable); + } else { + terminationFuture.complete(null); + } + }); } + } + + return terminationFuture; + } + + private CompletableFuture<Void> shutDownServices() { + synchronized (lock) { + Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3); + Exception exception = null; try { blobCacheService.close(); @@ -174,32 +205,26 @@ public class TaskManagerRunner implements FatalErrorHandler { } try { - rpcService.stopService().get(); - } catch (InterruptedException ie) { - exception = ExceptionUtils.firstOrSuppressed(ie, exception); - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - try { highAvailabilityServices.close(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } - ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor); + terminationFutures.add(rpcService.stopService()); + + terminationFutures.add(ExecutorUtils.nonBlockingShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor)); if (exception != null) { - throw exception; + terminationFutures.add(FutureUtils.completedExceptionally(exception)); } + + return FutureUtils.completeAll(terminationFutures); } } // export the termination future for caller to know it is terminated public CompletableFuture<Void> getTerminationFuture() { - return taskManager.getTerminationFuture(); + return terminationFuture; } // -------------------------------------------------------------------------------------------- @@ -210,12 +235,21 @@ public class TaskManagerRunner implements FatalErrorHandler { public void onFatalError(Throwable exception) { LOG.error("Fatal error occurred while executing the TaskManager. Shutting it down...", exception); - try { - shutDown(); - } catch (Throwable t) { - LOG.error("Could not properly shut down TaskManager.", t); + if (ExceptionUtils.isJvmFatalOrOutOfMemoryError(exception)) { + terminateJVM(); + } else { + closeAsync(); + + FutureUtils.orTimeout(terminationFuture, FATAL_ERROR_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + terminationFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + terminateJVM(); + }); } + } + protected void terminateJVM() { System.exit(RUNTIME_FAILURE_RETURN_CODE); } http://git-wip-us.apache.org/repos/asf/flink/blob/4922ced7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java new file mode 100644 index 0000000..f2f748d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.net.ServerSocket; +import java.util.concurrent.CompletableFuture; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link TaskManagerRunner}. + */ +public class TaskManagerRunnerTest extends TestLogger { + + @Test + public void testTaskManagerRunnerShutdown() throws Exception { + final Configuration configuration = new Configuration(); + final ResourceID taskManagerResourceId = ResourceID.generate(); + + final ServerSocket localhost = new ServerSocket(0); + + configuration.setString(JobManagerOptions.ADDRESS, localhost.getInetAddress().getHostName()); + configuration.setInteger(JobManagerOptions.PORT, localhost.getLocalPort()); + configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms"); + final CompletableFuture<Void> jvmTerminationFuture = new CompletableFuture<>(); + final TestingTaskManagerRunner taskManagerRunner = new TestingTaskManagerRunner(configuration, taskManagerResourceId, jvmTerminationFuture); + + taskManagerRunner.start(); + + try { + // wait until we trigger the jvm termination + jvmTerminationFuture.get(); + + assertThat(taskManagerRunner.getTerminationFuture().isDone(), is(true)); + } finally { + localhost.close(); + taskManagerRunner.close(); + } + } + + private static class TestingTaskManagerRunner extends TaskManagerRunner { + + private final CompletableFuture<Void> jvmTerminationFuture; + + public TestingTaskManagerRunner(Configuration configuration, ResourceID resourceId, CompletableFuture<Void> jvmTerminationFuture) throws Exception { + super(configuration, resourceId); + this.jvmTerminationFuture = jvmTerminationFuture; + } + + @Override + protected void terminateJVM() { + jvmTerminationFuture.complete(null); + } + } +}
