This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 102369424488a33aca841c3bc63cc095bb82321e Author: Till Rohrmann <[email protected]> AuthorDate: Fri Feb 22 12:24:24 2019 +0100 [FLINK-11718] Add onStart method to TaskExecutor --- .../flink/runtime/taskexecutor/TaskExecutor.java | 85 ++++++++++++++++------ .../runtime/taskexecutor/TaskExecutorTest.java | 9 ++- .../runtime/taskexecutor/TestingTaskExecutor.java | 75 +++++++++++++++++++ 3 files changed, 145 insertions(+), 24 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 370823d..e75fab6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -271,25 +271,43 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { // ------------------------------------------------------------------------ @Override - public void start() throws Exception { - super.start(); - - // start by connecting to the ResourceManager + public void onStart() throws Exception { try { - resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); + startTaskExecutorServices(); } catch (Exception e) { - onFatalError(e); + final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e); + onFatalError(exception); + throw exception; } - // tell the task slot table who's responsible for the task slot actions - taskSlotTable.start(new SlotActionsImpl()); + startRegistrationTimeout(); + } + + private void startTaskExecutorServices() throws Exception { + try { + // start by connecting to the ResourceManager + resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); - // start the job leader service - jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); + // tell the task slot table who's responsible for the task slot actions + taskSlotTable.start(new SlotActionsImpl()); - fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService()); + // start the job leader service + jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); - startRegistrationTimeout(); + fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService()); + } catch (Exception e) { + handleStartTaskExecutorServicesException(e); + } + } + + private void handleStartTaskExecutorServicesException(Exception e) throws Exception { + try { + stopTaskExecutorServices(); + } catch (Exception inner) { + e.addSuppressed(inner); + } + + throw e; } /** @@ -318,27 +336,52 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { resourceManagerHeartbeatManager.stop(); try { - resourceManagerLeaderRetriever.stop(); + stopTaskExecutorServices(); } catch (Exception e) { throwable = ExceptionUtils.firstOrSuppressed(e, throwable); } + if (throwable != null) { + return FutureUtils.completedExceptionally(new FlinkException("Error while shutting the TaskExecutor down.", throwable)); + } else { + log.info("Stopped TaskExecutor {}.", getAddress()); + return CompletableFuture.completedFuture(null); + } + } + + private void stopTaskExecutorServices() throws Exception { + Exception exception = null; + + try { + jobLeaderService.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + taskSlotTable.stop(); + + try { + resourceManagerLeaderRetriever.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + try { taskExecutorServices.shutDown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { fileCache.shutdown(); - } catch (Throwable t) { - throwable = ExceptionUtils.firstOrSuppressed(t, throwable); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); } // it will call close() recursively from the parent to children taskManagerMetricGroup.close(); - if (throwable != null) { - return FutureUtils.completedExceptionally(new FlinkException("Error while shutting the TaskExecutor down.", throwable)); - } else { - log.info("Stopped TaskExecutor {}.", getAddress()); - return CompletableFuture.completedFuture(null); - } + ExceptionUtils.tryRethrowException(exception); } // ====================================================================== diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 1b68d15..5cdd782 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -344,7 +344,7 @@ public class TaskExecutorTest extends TestLogger { .setTaskStateManager(localStateStoresManager) .build(); - final TaskExecutor taskManager = new TaskExecutor( + final TestingTaskExecutor taskManager = new TestingTaskExecutor( rpc, taskManagerConfiguration, haServices, @@ -357,6 +357,7 @@ public class TaskExecutorTest extends TestLogger { try { taskManager.start(); + taskManager.waitUntilStarted(); rpc.registerGateway(jobMasterAddress, jobMasterGateway); @@ -1047,7 +1048,7 @@ public class TaskExecutorTest extends TestLogger { .setTaskStateManager(localStateStoresManager) .build(); - final TaskExecutor taskManager = new TaskExecutor( + final TestingTaskExecutor taskManager = new TestingTaskExecutor( rpc, taskManagerConfiguration, haServices, @@ -1060,6 +1061,7 @@ public class TaskExecutorTest extends TestLogger { try { taskManager.start(); + taskManager.waitUntilStarted(); final TaskExecutorGateway tmGateway = taskManager.getSelfGateway(TaskExecutorGateway.class); @@ -1273,7 +1275,7 @@ public class TaskExecutorTest extends TestLogger { .setTaskStateManager(localStateStoresManager) .build(); - final TaskExecutor taskExecutor = new TaskExecutor( + final TestingTaskExecutor taskExecutor = new TestingTaskExecutor( rpc, taskManagerConfiguration, haServices, @@ -1305,6 +1307,7 @@ public class TaskExecutorTest extends TestLogger { haServices.setJobMasterLeaderRetriever(jobId, jobMasterLeaderRetriever); taskExecutor.start(); + taskExecutor.waitUntilStarted(); final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java new file mode 100644 index 0000000..3078b7e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.runtime.blob.BlobCacheService; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +/** + * {@link TaskExecutor} extension for testing purposes. + */ +class TestingTaskExecutor extends TaskExecutor { + private final CompletableFuture<Void> startFuture = new CompletableFuture<>(); + + public TestingTaskExecutor( + RpcService rpcService, + TaskManagerConfiguration taskManagerConfiguration, + HighAvailabilityServices haServices, + TaskManagerServices taskExecutorServices, + HeartbeatServices heartbeatServices, + TaskManagerMetricGroup taskManagerMetricGroup, + @Nullable String metricQueryServicePath, + BlobCacheService blobCacheService, + FatalErrorHandler fatalErrorHandler) { + super( + rpcService, + taskManagerConfiguration, + haServices, + taskExecutorServices, + heartbeatServices, + taskManagerMetricGroup, + metricQueryServicePath, + blobCacheService, + fatalErrorHandler); + } + + @Override + public void onStart() throws Exception { + try { + super.onStart(); + } catch (Exception e) { + startFuture.completeExceptionally(e); + throw e; + } + + startFuture.complete(null); + } + + void waitUntilStarted() { + startFuture.join(); + } +}
