This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 102369424488a33aca841c3bc63cc095bb82321e
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Feb 22 12:24:24 2019 +0100

    [FLINK-11718] Add onStart method to TaskExecutor
---
 .../flink/runtime/taskexecutor/TaskExecutor.java   | 85 ++++++++++++++++------
 .../runtime/taskexecutor/TaskExecutorTest.java     |  9 ++-
 .../runtime/taskexecutor/TestingTaskExecutor.java  | 75 +++++++++++++++++++
 3 files changed, 145 insertions(+), 24 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 370823d..e75fab6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -271,25 +271,43 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void start() throws Exception {
-               super.start();
-
-               // start by connecting to the ResourceManager
+       public void onStart() throws Exception {
                try {
-                       resourceManagerLeaderRetriever.start(new 
ResourceManagerLeaderListener());
+                       startTaskExecutorServices();
                } catch (Exception e) {
-                       onFatalError(e);
+                       final TaskManagerException exception = new 
TaskManagerException(String.format("Could not start the TaskExecutor %s", 
getAddress()), e);
+                       onFatalError(exception);
+                       throw exception;
                }
 
-               // tell the task slot table who's responsible for the task slot 
actions
-               taskSlotTable.start(new SlotActionsImpl());
+               startRegistrationTimeout();
+       }
+
+       private void startTaskExecutorServices() throws Exception {
+               try {
+                       // start by connecting to the ResourceManager
+                       resourceManagerLeaderRetriever.start(new 
ResourceManagerLeaderListener());
 
-               // start the job leader service
-               jobLeaderService.start(getAddress(), getRpcService(), 
haServices, new JobLeaderListenerImpl());
+                       // tell the task slot table who's responsible for the 
task slot actions
+                       taskSlotTable.start(new SlotActionsImpl());
 
-               fileCache = new 
FileCache(taskManagerConfiguration.getTmpDirectories(), 
blobCacheService.getPermanentBlobService());
+                       // start the job leader service
+                       jobLeaderService.start(getAddress(), getRpcService(), 
haServices, new JobLeaderListenerImpl());
 
-               startRegistrationTimeout();
+                       fileCache = new 
FileCache(taskManagerConfiguration.getTmpDirectories(), 
blobCacheService.getPermanentBlobService());
+               } catch (Exception e) {
+                       handleStartTaskExecutorServicesException(e);
+               }
+       }
+
+       private void handleStartTaskExecutorServicesException(Exception e) 
throws Exception {
+               try {
+                       stopTaskExecutorServices();
+               } catch (Exception inner) {
+                       e.addSuppressed(inner);
+               }
+
+               throw e;
        }
 
        /**
@@ -318,27 +336,52 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                resourceManagerHeartbeatManager.stop();
 
                try {
-                       resourceManagerLeaderRetriever.stop();
+                       stopTaskExecutorServices();
                } catch (Exception e) {
                        throwable = ExceptionUtils.firstOrSuppressed(e, 
throwable);
                }
 
+               if (throwable != null) {
+                       return FutureUtils.completedExceptionally(new 
FlinkException("Error while shutting the TaskExecutor down.", throwable));
+               } else {
+                       log.info("Stopped TaskExecutor {}.", getAddress());
+                       return CompletableFuture.completedFuture(null);
+               }
+       }
+
+       private void stopTaskExecutorServices() throws Exception {
+               Exception exception = null;
+
+               try {
+                       jobLeaderService.stop();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               taskSlotTable.stop();
+
+               try {
+                       resourceManagerLeaderRetriever.stop();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
                try {
                        taskExecutorServices.shutDown();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               try {
                        fileCache.shutdown();
-               } catch (Throwable t) {
-                       throwable = ExceptionUtils.firstOrSuppressed(t, 
throwable);
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }
 
                // it will call close() recursively from the parent to children
                taskManagerMetricGroup.close();
 
-               if (throwable != null) {
-                       return FutureUtils.completedExceptionally(new 
FlinkException("Error while shutting the TaskExecutor down.", throwable));
-               } else {
-                       log.info("Stopped TaskExecutor {}.", getAddress());
-                       return CompletableFuture.completedFuture(null);
-               }
+               ExceptionUtils.tryRethrowException(exception);
        }
 
        // 
======================================================================
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 1b68d15..5cdd782 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -344,7 +344,7 @@ public class TaskExecutorTest extends TestLogger {
                        .setTaskStateManager(localStateStoresManager)
                        .build();
 
-               final TaskExecutor taskManager = new TaskExecutor(
+               final TestingTaskExecutor taskManager = new TestingTaskExecutor(
                        rpc,
                        taskManagerConfiguration,
                        haServices,
@@ -357,6 +357,7 @@ public class TaskExecutorTest extends TestLogger {
 
                try {
                        taskManager.start();
+                       taskManager.waitUntilStarted();
 
                        rpc.registerGateway(jobMasterAddress, jobMasterGateway);
 
@@ -1047,7 +1048,7 @@ public class TaskExecutorTest extends TestLogger {
                        .setTaskStateManager(localStateStoresManager)
                        .build();
 
-               final TaskExecutor taskManager = new TaskExecutor(
+               final TestingTaskExecutor taskManager = new TestingTaskExecutor(
                        rpc,
                        taskManagerConfiguration,
                        haServices,
@@ -1060,6 +1061,7 @@ public class TaskExecutorTest extends TestLogger {
 
                try {
                        taskManager.start();
+                       taskManager.waitUntilStarted();
 
                        final TaskExecutorGateway tmGateway = 
taskManager.getSelfGateway(TaskExecutorGateway.class);
 
@@ -1273,7 +1275,7 @@ public class TaskExecutorTest extends TestLogger {
                        .setTaskStateManager(localStateStoresManager)
                        .build();
 
-               final TaskExecutor taskExecutor = new TaskExecutor(
+               final TestingTaskExecutor taskExecutor = new 
TestingTaskExecutor(
                        rpc,
                        taskManagerConfiguration,
                        haServices,
@@ -1305,6 +1307,7 @@ public class TaskExecutorTest extends TestLogger {
                        haServices.setJobMasterLeaderRetriever(jobId, 
jobMasterLeaderRetriever);
 
                        taskExecutor.start();
+                       taskExecutor.waitUntilStarted();
 
                        final TaskExecutorGateway taskExecutorGateway = 
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
new file mode 100644
index 0000000..3078b7e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link TaskExecutor} extension for testing purposes.
+ */
+class TestingTaskExecutor extends TaskExecutor {
+       private final CompletableFuture<Void> startFuture = new 
CompletableFuture<>();
+
+       public TestingTaskExecutor(
+                       RpcService rpcService,
+                       TaskManagerConfiguration taskManagerConfiguration,
+                       HighAvailabilityServices haServices,
+                       TaskManagerServices taskExecutorServices,
+                       HeartbeatServices heartbeatServices,
+                       TaskManagerMetricGroup taskManagerMetricGroup,
+                       @Nullable String metricQueryServicePath,
+                       BlobCacheService blobCacheService,
+                       FatalErrorHandler fatalErrorHandler) {
+               super(
+                       rpcService,
+                       taskManagerConfiguration,
+                       haServices,
+                       taskExecutorServices,
+                       heartbeatServices,
+                       taskManagerMetricGroup,
+                       metricQueryServicePath,
+                       blobCacheService,
+                       fatalErrorHandler);
+       }
+
+       @Override
+       public void onStart() throws Exception {
+               try {
+                       super.onStart();
+               } catch (Exception e) {
+                       startFuture.completeExceptionally(e);
+                       throw e;
+               }
+
+               startFuture.complete(null);
+       }
+
+       void waitUntilStarted() {
+               startFuture.join();
+       }
+}

Reply via email to