Repository: flink
Updated Branches:
  refs/heads/master f2ae2414d -> f429b4cde


[hotfix] Simplify TaskExecutorTest to avoid Mockito


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c02e1a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c02e1a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c02e1a4

Branch: refs/heads/master
Commit: 1c02e1a43219890b4e9d7f2451f607a51a4ac348
Parents: 4c8851a
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Sun Feb 11 19:43:15 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Feb 12 11:59:29 2018 +0100

----------------------------------------------------------------------
 .../utils/TestingJobMasterGateway.java          | 178 ++++++
 .../utils/TestingResourceManagerGateway.java    |  10 +
 .../runtime/taskexecutor/TaskExecutorTest.java  | 592 +++++--------------
 3 files changed, 343 insertions(+), 437 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1c02e1a4/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
new file mode 100644
index 0000000..ae7a4f3
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link JobMasterGateway} implementation for testing purposes.
+ */
+public class TestingJobMasterGateway extends TestingRestfulGateway implements 
JobMasterGateway {
+
+       @Override
+       public CompletableFuture<Acknowledge> cancel(Time timeout) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> stop(Time timeout) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> 
updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<SerializedInputSplit> 
requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID 
executionAttempt) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<ExecutionState> 
requestPartitionState(IntermediateDataSetID intermediateResultId, 
ResultPartitionID partitionId) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> 
scheduleOrUpdateConsumers(ResultPartitionID partitionID, Time timeout) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> disconnectTaskManager(ResourceID 
resourceID, Exception cause) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void disconnectResourceManager(ResourceManagerId 
resourceManagerId, Exception cause) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<ClassloadingProps> requestClassloadingProps() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<Collection<SlotOffer>> offerSlots(ResourceID 
taskManagerId, Collection<SlotOffer> slots, Time timeout) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void failSlot(ResourceID taskManagerId, AllocationID 
allocationId, Exception cause) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<RegistrationResponse> 
registerTaskManager(String taskManagerRpcAddress, TaskManagerLocation 
taskManagerLocation, Time timeout) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void heartbeatFromTaskManager(ResourceID resourceID) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void heartbeatFromResourceManager(ResourceID resourceID) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID 
executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, 
TaskStateSnapshot subtaskState) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void declineCheckpoint(JobID jobID, ExecutionAttemptID 
executionAttemptID, long checkpointId, Throwable cause) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public JobMasterId getFencingToken() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<String> triggerSavepoint(JobID jobId, String 
targetDirectory, Time timeout) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID 
jobId, Time timeout) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<KvStateLocation> requestKvStateLocation(JobID 
jobId, String registrationName) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> notifyKvStateRegistered(JobID 
jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String 
registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> notifyKvStateUnregistered(JobID 
jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String 
registrationName) {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1c02e1a4/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 1b6fbef..33c6c08 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -82,6 +82,8 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
 
        private volatile Function<Tuple2<ResourceID, FileType>, 
CompletableFuture<TransientBlobKey>> requestTaskManagerFileUploadFunction;
 
+       private volatile Consumer<Tuple2<ResourceID, Throwable>> 
disconnectTaskExecutorConsumer;
+
        public TestingResourceManagerGateway() {
                this(
                        ResourceManagerId.generate(),
@@ -135,6 +137,10 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
                this.requestTaskManagerFileUploadFunction = 
requestTaskManagerFileUploadFunction;
        }
 
+       public void 
setDisconnectTaskExecutorConsumer(Consumer<Tuple2<ResourceID, Throwable>> 
disconnectTaskExecutorConsumer) {
+               this.disconnectTaskExecutorConsumer = 
disconnectTaskExecutorConsumer;
+       }
+
        @Override
        public CompletableFuture<RegistrationResponse> 
registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, 
String jobMasterAddress, JobID jobId, Time timeout) {
                final Consumer<Tuple4<JobMasterId, ResourceID, String, JobID>> 
currentConsumer = registerJobManagerConsumer;
@@ -229,7 +235,11 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
 
        @Override
        public void disconnectTaskManager(ResourceID resourceID, Exception 
cause) {
+               final Consumer<Tuple2<ResourceID, Throwable>> currentConsumer = 
disconnectTaskExecutorConsumer;
 
+               if (currentConsumer != null) {
+                       currentConsumer.accept(Tuple2.of(resourceID, cause));
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1c02e1a4/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index cbd7965..e894e48 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -57,6 +57,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -66,13 +67,13 @@ import 
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -85,14 +86,12 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.Flip6;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -105,7 +104,6 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -115,9 +113,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -125,8 +121,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.RETURNS_MOCKS;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
@@ -141,26 +137,57 @@ import static org.mockito.Mockito.when;
 @Category(Flip6.class)
 public class TaskExecutorTest extends TestLogger {
 
-       private final Time timeout = Time.milliseconds(10000L);
-
-       private TimerService<AllocationID> timerService;
+       private static final Time timeout = Time.milliseconds(10000L);
 
        private TestingRpcService rpc;
 
        private BlobCacheService dummyBlobCacheService;
 
+       private TimerService<AllocationID> timerService;
+
+       private Configuration configuration;
+
+       private TaskManagerConfiguration taskManagerConfiguration;
+
+       private TaskManagerLocation taskManagerLocation;
+
+       private JobID jobId;
+
+       private TestingFatalErrorHandler testingFatalErrorHandler;
+
+       private TestingHighAvailabilityServices haServices;
+
+       private TestingLeaderRetrievalService resourceManagerLeaderRetriever;
+
+       private TestingLeaderRetrievalService jobManagerLeaderRetriever;
+
        @Before
        public void setup() throws IOException {
                rpc = new TestingRpcService();
                timerService = new 
TimerService<>(TestingUtils.defaultExecutor(), timeout.toMilliseconds());
+
                dummyBlobCacheService = new BlobCacheService(
                        new Configuration(),
                        new VoidBlobStore(),
                        null);
+
+               configuration = new Configuration();
+               taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
+
+               taskManagerLocation = new LocalTaskManagerLocation();
+               jobId = new JobID();
+
+               testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+               haServices = new TestingHighAvailabilityServices();
+               resourceManagerLeaderRetriever = new 
TestingLeaderRetrievalService();
+               jobManagerLeaderRetriever = new TestingLeaderRetrievalService();
+               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
+               haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetriever);
        }
 
        @After
-       public void teardown() throws IOException {
+       public void teardown() throws Exception {
                if (rpc != null) {
                        rpc.stopService();
                        rpc = null;
@@ -175,6 +202,8 @@ public class TaskExecutorTest extends TestLogger {
                        dummyBlobCacheService.close();
                        dummyBlobCacheService = null;
                }
+
+               testingFatalErrorHandler.rethrowError();
        }
 
        @Rule
@@ -182,60 +211,21 @@ public class TaskExecutorTest extends TestLogger {
 
        @Test
        public void testHeartbeatTimeoutWithJobManager() throws Exception {
-               final JobID jobId = new JobID();
-               final Configuration configuration = new Configuration();
-               final TaskManagerConfiguration tmConfig = 
TaskManagerConfiguration.fromConfiguration(configuration);
-               final ResourceID tmResourceId = new ResourceID("tm");
-               final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
-               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), 
mock(TimerService.class));
+               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(ResourceProfile.UNKNOWN), timerService);
 
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
-               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService(
-                       null,
-                       null);
-               final TestingLeaderRetrievalService jmLeaderRetrievalService = 
new TestingLeaderRetrievalService(
-                       null,
-                       null);
-               haServices.setJobMasterLeaderRetriever(jobId, 
jmLeaderRetrievalService);
-               
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
 
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
-
-               final long heartbeatTimeout = 10L;
+               final long heartbeatInterval = 1L;
+               final long heartbeatTimeout = 3L;
 
-               HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
-               when(heartbeatServices.createHeartbeatManager(
-                       eq(taskManagerLocation.getResourceID()),
-                       any(HeartbeatListener.class),
-                       any(ScheduledExecutor.class),
-                       any(Logger.class))).thenAnswer(
-                       new Answer<HeartbeatManagerImpl<Void, Void>>() {
-                               @Override
-                               public HeartbeatManagerImpl<Void, Void> 
answer(InvocationOnMock invocation) throws Throwable {
-                                       return new HeartbeatManagerImpl<>(
-                                               heartbeatTimeout,
-                                               
taskManagerLocation.getResourceID(),
-                                               (HeartbeatListener<Void, 
Void>)invocation.getArguments()[1],
-                                               
(Executor)invocation.getArguments()[2],
-                                               
(ScheduledExecutor)invocation.getArguments()[2],
-                                               
(Logger)invocation.getArguments()[3]);
-                               }
-                       }
-               );
+               HeartbeatServices heartbeatServices = new 
HeartbeatServices(heartbeatInterval, heartbeatTimeout);
 
                final String jobMasterAddress = "jm";
                final UUID jmLeaderId = UUID.randomUUID();
-               final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
-               final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
 
-               when(jobMasterGateway.registerTaskManager(
-                               any(String.class),
-                               eq(taskManagerLocation),
-                               any(Time.class)
-               )).thenReturn(CompletableFuture.completedFuture(new 
JMTMRegistrationSuccess(jmResourceId)));
-               
when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
-               when(jobMasterGateway.getHostname()).thenReturn("localhost");
+               final ResourceID jmResourceId = ResourceID.generate();
+               final SimpleJobMasterGateway jobMasterGateway = new 
SimpleJobMasterGateway(
+                       CompletableFuture.completedFuture(new 
JMTMRegistrationSuccess(jmResourceId)));
 
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
@@ -245,7 +235,7 @@ public class TaskExecutorTest extends TestLogger {
 
                final TaskExecutor taskManager = new TaskExecutor(
                        rpc,
-                       tmConfig,
+                       taskManagerConfiguration,
                        haServices,
                        taskManagerServices,
                        heartbeatServices,
@@ -263,84 +253,62 @@ public class TaskExecutorTest extends TestLogger {
                        jobLeaderService.addJob(jobId, jobMasterAddress);
 
                        // now inform the task manager about the new job leader
-                       
jmLeaderRetrievalService.notifyListener(jobMasterAddress, jmLeaderId);
+                       
jobManagerLeaderRetriever.notifyListener(jobMasterAddress, jmLeaderId);
 
                        // register task manager success will trigger 
monitoring heartbeat target between tm and jm
-                       verify(jobMasterGateway, 
Mockito.timeout(timeout.toMilliseconds())).registerTaskManager(
-                                       eq(taskManager.getAddress()), 
eq(taskManagerLocation), any(Time.class));
+                       final TaskManagerLocation taskManagerLocation1 = 
jobMasterGateway.getRegisterTaskManagerFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                       assertThat(taskManagerLocation1, 
equalTo(taskManagerLocation));
 
                        // the timeout should trigger disconnecting from the 
JobManager
-                       verify(jobMasterGateway, timeout(heartbeatTimeout * 
50L)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), 
any(TimeoutException.class));
-
-                       // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowError();
+                       final ResourceID resourceID = 
jobMasterGateway.getDisconnectTaskManagerFuture().get(heartbeatTimeout * 50L, 
TimeUnit.MILLISECONDS);
+                       assertThat(resourceID, 
equalTo(taskManagerLocation.getResourceID()));
 
                } finally {
-                       taskManager.shutDown();
-                       
taskManager.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                       RpcUtils.terminateRpcEndpoint(taskManager, timeout);
                }
        }
 
        @Test
        public void testHeartbeatTimeoutWithResourceManager() throws Exception {
                final String rmAddress = "rm";
-               final String tmAddress = "tm";
                final ResourceID rmResourceId = new ResourceID(rmAddress);
-               final ResourceID tmResourceId = new ResourceID(tmAddress);
-               final UUID rmLeaderId = UUID.randomUUID();
 
-               // register the mock resource manager gateway
-               ResourceManagerGateway rmGateway = 
mock(ResourceManagerGateway.class);
-               when(rmGateway.registerTaskExecutor(
-                       anyString(), any(ResourceID.class), 
any(SlotReport.class), anyInt(), any(HardwareDescription.class), 
any(Time.class)))
-                       .thenReturn(
-                               CompletableFuture.completedFuture(
-                                       new TaskExecutorRegistrationSuccess(
-                                               new InstanceID(),
-                                               rmResourceId,
-                                               10L,
-                                               new 
ClusterInformation("localhost", 1234))));
+               final long heartbeatInterval = 1L;
+               final long heartbeatTimeout = 3L;
 
-               rpc.registerGateway(rmAddress, rmGateway);
+               final ResourceManagerId rmLeaderId = 
ResourceManagerId.generate();
 
-               final TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService(
-                       null,
-                       null);
-               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-               haServices.setResourceManagerLeaderRetriever(testLeaderService);
+               TestingResourceManagerGateway rmGateway = new 
TestingResourceManagerGateway(
+                       rmLeaderId,
+                       rmResourceId,
+                       heartbeatInterval,
+                       rmAddress,
+                       rmAddress);
+
+               final TaskExecutorRegistrationSuccess registrationResponse = 
new TaskExecutorRegistrationSuccess(
+                       new InstanceID(),
+                       rmResourceId,
+                       heartbeatInterval,
+                       new ClusterInformation("localhost", 1234));
 
-               final TaskManagerConfiguration taskManagerConfiguration = 
mock(TaskManagerConfiguration.class);
-               when(taskManagerConfiguration.getNumberSlots()).thenReturn(1);
+               final CompletableFuture<ResourceID> 
taskExecutorRegistrationFuture = new CompletableFuture<>();
+               rmGateway.setRegisterTaskExecutorFunction(
+                       registration -> {
+                               
taskExecutorRegistrationFuture.complete(registration.f1);
+                               return 
CompletableFuture.completedFuture(registrationResponse);
+                       });
 
-               final TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
-               
when(taskManagerLocation.getResourceID()).thenReturn(tmResourceId);
+               final CompletableFuture<ResourceID> 
taskExecutorDisconnectFuture = new CompletableFuture<>();
+               rmGateway.setDisconnectTaskExecutorConsumer(
+                       disconnectInfo -> 
taskExecutorDisconnectFuture.complete(disconnectInfo.f0));
+
+               rpc.registerGateway(rmAddress, rmGateway);
 
                final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
                final SlotReport slotReport = new SlotReport();
                
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
-
-               final long heartbeatTimeout = 10L;
-               HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
-               when(heartbeatServices.createHeartbeatManager(
-                       eq(taskManagerLocation.getResourceID()),
-                       any(HeartbeatListener.class),
-                       any(ScheduledExecutor.class),
-                       any(Logger.class))).thenAnswer(
-                       new Answer<HeartbeatManagerImpl<SlotReport, Void>>() {
-                               @Override
-                               public HeartbeatManagerImpl<SlotReport, Void> 
answer(InvocationOnMock invocation) throws Throwable {
-                                       return new HeartbeatManagerImpl<>(
-                                               heartbeatTimeout,
-                                               
taskManagerLocation.getResourceID(),
-                                               (HeartbeatListener<SlotReport, 
Void>)invocation.getArguments()[1],
-                                               
(Executor)invocation.getArguments()[2],
-                                               
(ScheduledExecutor)invocation.getArguments()[2],
-                                               
(Logger)invocation.getArguments()[3]);
-                                       }
-                               }
-               );
+               HeartbeatServices heartbeatServices = new 
HeartbeatServices(heartbeatInterval, heartbeatTimeout);
 
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
@@ -361,21 +329,51 @@ public class TaskExecutorTest extends TestLogger {
                        taskManager.start();
 
                        // define a leader and see that a registration happens
-                       testLeaderService.notifyListener(rmAddress, rmLeaderId);
+                       
resourceManagerLeaderRetriever.notifyListener(rmAddress, rmLeaderId.toUUID());
 
                        // register resource manager success will trigger 
monitoring heartbeat target between tm and rm
-                       verify(rmGateway, 
Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).registerTaskExecutor(
-                                       eq(taskManager.getAddress()), 
eq(tmResourceId), any(SlotReport.class), anyInt(), 
any(HardwareDescription.class), any(Time.class));
+                       
assertThat(taskExecutorRegistrationFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));
 
                        // heartbeat timeout should trigger disconnect 
TaskManager from ResourceManager
-                       verify(rmGateway, timeout(heartbeatTimeout * 
50L)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), 
any(TimeoutException.class));
-
-                       // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowError();
+                       
assertThat(taskExecutorDisconnectFuture.get(heartbeatTimeout * 50L, 
TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));
 
                } finally {
-                       taskManager.shutDown();
-                       
taskManager.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                       RpcUtils.terminateRpcEndpoint(taskManager, timeout);
+               }
+       }
+
+       private static final class SimpleJobMasterGateway extends 
TestingJobMasterGateway {
+
+               private final CompletableFuture<TaskManagerLocation> 
registerTaskManagerFuture = new CompletableFuture<>();
+
+               private final CompletableFuture<ResourceID> 
disconnectTaskManagerFuture = new CompletableFuture<>();
+
+               private final CompletableFuture<RegistrationResponse> 
registerTaskManagerResponseFuture;
+
+               private 
SimpleJobMasterGateway(CompletableFuture<RegistrationResponse> 
registerTaskManagerResponseFuture) {
+                       this.registerTaskManagerResponseFuture = 
registerTaskManagerResponseFuture;
+               }
+
+               @Override
+               public CompletableFuture<RegistrationResponse> 
registerTaskManager(String taskManagerRpcAddress, TaskManagerLocation 
taskManagerLocation, Time timeout) {
+                       registerTaskManagerFuture.complete(taskManagerLocation);
+
+                       return registerTaskManagerResponseFuture;
+               }
+
+               @Override
+               public CompletableFuture<Acknowledge> 
disconnectTaskManager(ResourceID resourceID, Exception cause) {
+                       disconnectTaskManagerFuture.complete(resourceID);
+
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               }
+
+               CompletableFuture<TaskManagerLocation> 
getRegisterTaskManagerFuture() {
+                       return registerTaskManagerFuture;
+               }
+
+               CompletableFuture<ResourceID> getDisconnectTaskManagerFuture() {
+                       return disconnectTaskManagerFuture;
                }
        }
 
@@ -385,10 +383,9 @@ public class TaskExecutorTest extends TestLogger {
        @Test
        public void testHeartbeatSlotReporting() throws Exception {
                final long verificationTimeout = 1000L;
+               final long heartbeatTimeout = 10000L;
                final String rmAddress = "rm";
-               final String tmAddress = "tm";
                final ResourceID rmResourceId = new ResourceID(rmAddress);
-               final ResourceID tmResourceId = new ResourceID(tmAddress);
                final UUID rmLeaderId = UUID.randomUUID();
 
                // register the mock resource manager gateway
@@ -405,21 +402,8 @@ public class TaskExecutorTest extends TestLogger {
 
                rpc.registerGateway(rmAddress, rmGateway);
 
-               final TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService(
-                       null,
-                       null);
-               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-               haServices.setResourceManagerLeaderRetriever(testLeaderService);
-
-               final TaskManagerConfiguration taskManagerConfiguration = 
mock(TaskManagerConfiguration.class);
-               when(taskManagerConfiguration.getNumberSlots()).thenReturn(1);
-               
when(taskManagerConfiguration.getTimeout()).thenReturn(Time.seconds(10L));
-
-               final TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
-               
when(taskManagerLocation.getResourceID()).thenReturn(tmResourceId);
-
                final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
-               final SlotID slotId = new SlotID(tmResourceId, 0);
+               final SlotID slotId = new 
SlotID(taskManagerLocation.getResourceID(), 0);
                final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
                final SlotReport slotReport1 = new SlotReport(
                        new SlotStatus(
@@ -434,9 +418,6 @@ public class TaskExecutorTest extends TestLogger {
 
                
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport1,
 slotReport2);
 
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
-
-               final long heartbeatTimeout = 10000L;
                final HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
 
                when(heartbeatServices.createHeartbeatManager(
@@ -480,11 +461,11 @@ public class TaskExecutorTest extends TestLogger {
                        HeartbeatManager<Void, SlotReport> heartbeatManager = 
taskManager.getResourceManagerHeartbeatManager();
 
                        // define a leader and see that a registration happens
-                       testLeaderService.notifyListener(rmAddress, rmLeaderId);
+                       
resourceManagerLeaderRetriever.notifyListener(rmAddress, rmLeaderId);
 
                        // register resource manager success will trigger 
monitoring heartbeat target between tm and rm
                        verify(rmGateway, 
timeout(verificationTimeout).atLeast(1)).registerTaskExecutor(
-                               eq(taskManager.getAddress()), eq(tmResourceId), 
eq(slotReport1), anyInt(), any(HardwareDescription.class), any(Time.class));
+                               eq(taskManager.getAddress()), 
eq(taskManagerLocation.getResourceID()), eq(slotReport1), anyInt(), 
any(HardwareDescription.class), any(Time.class));
 
                        verify(heartbeatManager, 
timeout(verificationTimeout)).monitorTarget(any(ResourceID.class), 
any(HeartbeatTarget.class));
 
@@ -504,10 +485,6 @@ public class TaskExecutorTest extends TestLogger {
 
                        // the new slot report should be reported
                        assertEquals(slotReport2, actualSlotReport);
-
-                       // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowError();
-
                } finally {
                        taskManager.shutDown();
                        
taskManager.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
@@ -516,7 +493,6 @@ public class TaskExecutorTest extends TestLogger {
 
        @Test
        public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
-               final ResourceID resourceID = ResourceID.generate();
                final String resourceManagerAddress = 
"/resource/manager/address/one";
                final ResourceID resourceManagerResourceId = new 
ResourceID(resourceManagerAddress);
                final String dispatcherAddress = "localhost";
@@ -530,14 +506,8 @@ public class TaskExecutorTest extends TestLogger {
                        .thenReturn(CompletableFuture.completedFuture(new 
TaskExecutorRegistrationSuccess(
                                new InstanceID(), resourceManagerResourceId, 
10L, new ClusterInformation("localhost", 1234))));
 
-               TaskManagerConfiguration taskManagerServicesConfiguration = 
mock(TaskManagerConfiguration.class);
-               
when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
-
                rpc.registerGateway(resourceManagerAddress, rmGateway);
 
-               TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
-               
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
-
                StandaloneHaServices haServices = new StandaloneHaServices(
                        resourceManagerAddress,
                        dispatcherAddress,
@@ -548,8 +518,6 @@ public class TaskExecutorTest extends TestLogger {
                final SlotReport slotReport = new SlotReport();
                
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
-
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setTaskSlotTable(taskSlotTable)
@@ -557,7 +525,7 @@ public class TaskExecutorTest extends TestLogger {
 
                TaskExecutor taskManager = new TaskExecutor(
                        rpc,
-                       taskManagerServicesConfiguration,
+                       taskManagerConfiguration,
                        haServices,
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
@@ -570,10 +538,7 @@ public class TaskExecutorTest extends TestLogger {
                        String taskManagerAddress = taskManager.getAddress();
 
                        verify(rmGateway, 
Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-                                       eq(taskManagerAddress), eq(resourceID), 
eq(slotReport), anyInt(), any(HardwareDescription.class), any(Time.class));
-
-                       // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowError();
+                                       eq(taskManagerAddress), 
eq(taskManagerLocation.getResourceID()), eq(slotReport), anyInt(), 
any(HardwareDescription.class), any(Time.class));
                }
                finally {
                        taskManager.shutDown();
@@ -583,8 +548,6 @@ public class TaskExecutorTest extends TestLogger {
 
        @Test
        public void testTriggerRegistrationOnLeaderChange() throws Exception {
-               final ResourceID tmResourceID = ResourceID.generate();
-
                final String address1 = "/resource/manager/address/one";
                final String address2 = "/resource/manager/address/two";
                final UUID leaderId1 = UUID.randomUUID();
@@ -608,28 +571,10 @@ public class TaskExecutorTest extends TestLogger {
                rpc.registerGateway(address1, rmGateway1);
                rpc.registerGateway(address2, rmGateway2);
 
-               TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService(
-                       null,
-                       null);
-
-               TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-               haServices.setResourceManagerLeaderRetriever(testLeaderService);
-
-               TaskManagerConfiguration taskManagerServicesConfiguration = 
mock(TaskManagerConfiguration.class);
-               
when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
-               
when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new 
Configuration());
-               
when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new 
String[1]);
-
-               TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
-               
when(taskManagerLocation.getResourceID()).thenReturn(tmResourceID);
-               when(taskManagerLocation.getHostname()).thenReturn("foobar");
-
                final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
                final SlotReport slotReport = new SlotReport();
                
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
-
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setTaskSlotTable(taskSlotTable)
@@ -637,7 +582,7 @@ public class TaskExecutorTest extends TestLogger {
 
                TaskExecutor taskManager = new TaskExecutor(
                        rpc,
-                       taskManagerServicesConfiguration,
+                       taskManagerConfiguration,
                        haServices,
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
@@ -653,24 +598,21 @@ public class TaskExecutorTest extends TestLogger {
                        assertNull(taskManager.getResourceManagerConnection());
 
                        // define a leader and see that a registration happens
-                       testLeaderService.notifyListener(address1, leaderId1);
+                       resourceManagerLeaderRetriever.notifyListener(address1, 
leaderId1);
 
                        verify(rmGateway1, 
Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-                                       eq(taskManagerAddress), 
eq(tmResourceID), any(SlotReport.class), anyInt(), 
any(HardwareDescription.class), any(Time.class));
+                                       eq(taskManagerAddress), 
eq(taskManagerLocation.getResourceID()), any(SlotReport.class), anyInt(), 
any(HardwareDescription.class), any(Time.class));
                        
assertNotNull(taskManager.getResourceManagerConnection());
 
                        // cancel the leader 
-                       testLeaderService.notifyListener(null, null);
+                       resourceManagerLeaderRetriever.notifyListener(null, 
null);
 
                        // set a new leader, see that a registration happens 
-                       testLeaderService.notifyListener(address2, leaderId2);
+                       resourceManagerLeaderRetriever.notifyListener(address2, 
leaderId2);
 
                        verify(rmGateway2, 
Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-                                       eq(taskManagerAddress), 
eq(tmResourceID), eq(slotReport), anyInt(), any(HardwareDescription.class), 
any(Time.class));
+                                       eq(taskManagerAddress), 
eq(taskManagerLocation.getResourceID()), eq(slotReport), anyInt(), 
any(HardwareDescription.class), any(Time.class));
                        
assertNotNull(taskManager.getResourceManagerConnection());
-
-                       // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowError();
                }
                finally {
                        taskManager.shutDown();
@@ -683,10 +625,6 @@ public class TaskExecutorTest extends TestLogger {
         */
        @Test(timeout = 10000L)
        public void testTaskSubmission() throws Exception {
-               final Configuration configuration = new Configuration();
-
-               final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
-               final JobID jobId = new JobID();
                final AllocationID allocationId = new AllocationID();
                final JobMasterId jobMasterId = JobMasterId.generate();
                final JobVertexID jobVertexId = new JobVertexID();
@@ -720,8 +658,8 @@ public class TaskExecutorTest extends TestLogger {
                                0,
                                0,
                                null,
-                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                               
Collections.<InputGateDeploymentDescriptor>emptyList());
+                               Collections.emptyList(),
+                               Collections.emptyList());
 
                final LibraryCacheManager libraryCacheManager = 
mock(LibraryCacheManager.class);
                
when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
@@ -752,21 +690,6 @@ public class TaskExecutorTest extends TestLogger {
                when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), 
eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class));
                
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
-               final TaskManagerMetricGroup taskManagerMetricGroup = 
mock(TaskManagerMetricGroup.class);
-
-               TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
-               
when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
-
-               when(taskManagerMetricGroup.addTaskForJob(
-                               any(JobID.class), anyString(), 
any(JobVertexID.class), any(ExecutionAttemptID.class),
-                               anyString(), anyInt(), anyInt())
-                       ).thenReturn(taskMetricGroup);
-
-               final HighAvailabilityServices haServices = 
mock(HighAvailabilityServices.class);
-               
when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class));
-
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
-
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setNetworkEnvironment(networkEnvironment)
                        .setTaskSlotTable(taskSlotTable)
@@ -779,7 +702,7 @@ public class TaskExecutorTest extends TestLogger {
                        haServices,
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
-                       taskManagerMetricGroup,
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -793,9 +716,6 @@ public class TaskExecutorTest extends TestLogger {
                        CompletableFuture<Boolean> completionFuture = 
TestInvokable.completableFuture;
 
                        completionFuture.get();
-
-                       // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowError();
                } finally {
                        taskManager.shutDown();
                        
taskManager.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
@@ -825,27 +745,9 @@ public class TaskExecutorTest extends TestLogger {
         */
        @Test
        public void testJobLeaderDetection() throws Exception {
-               final JobID jobId = new JobID();
-
-               final Configuration configuration = new Configuration();
-               final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
-               final ResourceID resourceId = new ResourceID("foobar");
-               final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
-               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-               final TimerService<AllocationID> timerService = 
mock(TimerService.class);
-               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), timerService);
+               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
                final JobManagerTable jobManagerTable = new JobManagerTable();
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
-
-               final TestingLeaderRetrievalService 
resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(
-                       null,
-                       null);
-               final TestingLeaderRetrievalService 
jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(
-                       null,
-                       null);
-               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
-               haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetrievalService);
 
                final String resourceManagerAddress = "rm";
                final ResourceManagerId resourceManagerLeaderId = 
ResourceManagerId.generate();
@@ -856,7 +758,7 @@ public class TaskExecutorTest extends TestLogger {
 
                when(resourceManagerGateway.registerTaskExecutor(
                        any(String.class),
-                       eq(resourceId),
+                       eq(taskManagerLocation.getResourceID()),
                        any(SlotReport.class),
                        anyInt(),
                        any(HardwareDescription.class),
@@ -882,7 +784,7 @@ public class TaskExecutorTest extends TestLogger {
                rpc.registerGateway(jobManagerAddress, jobMasterGateway);
 
                final AllocationID allocationId = new AllocationID();
-               final SlotID slotId = new SlotID(resourceId, 0);
+               final SlotID slotId = new 
SlotID(taskManagerLocation.getResourceID(), 0);
                final SlotOffer slotOffer = new SlotOffer(allocationId, 0, 
ResourceProfile.UNKNOWN);
 
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
@@ -908,7 +810,7 @@ public class TaskExecutorTest extends TestLogger {
                        final TaskExecutorGateway tmGateway = 
taskManager.getSelfGateway(TaskExecutorGateway.class);
 
                        // tell the task manager about the rm leader
-                       
resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, 
resourceManagerLeaderId.toUUID());
+                       
resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, 
resourceManagerLeaderId.toUUID());
 
                        // request slots from the task manager under the given 
allocation id
                        CompletableFuture<Acknowledge> slotRequestAck = 
tmGateway.requestSlot(
@@ -922,16 +824,13 @@ public class TaskExecutorTest extends TestLogger {
                        slotRequestAck.get();
 
                        // now inform the task manager about the new job leader
-                       
jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, 
jobManagerLeaderId);
+                       
jobManagerLeaderRetriever.notifyListener(jobManagerAddress, jobManagerLeaderId);
 
                        // the job leader should get the allocation id offered
                        verify(jobMasterGateway, 
Mockito.timeout(timeout.toMilliseconds())).offerSlots(
                                        any(ResourceID.class),
                                        
(Collection<SlotOffer>)Matchers.argThat(contains(slotOffer)),
                                        any(Time.class));
-
-                       // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowError();
                } finally {
                        taskManager.shutDown();
                        
taskManager.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
@@ -944,18 +843,9 @@ public class TaskExecutorTest extends TestLogger {
         */
        @Test
        public void testSlotAcceptance() throws Exception {
-               final JobID jobId = new JobID();
-
-               final Configuration configuration = new Configuration();
-               final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
-               final ResourceID resourceId = new ResourceID("foobar");
-               final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
-               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-               final TimerService<AllocationID> timerService = 
mock(TimerService.class);
                final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), 
mock(ResourceProfile.class)), timerService);
                final JobManagerTable jobManagerTable = new JobManagerTable();
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
                final String resourceManagerAddress = "rm";
                final UUID resourceManagerLeaderId = UUID.randomUUID();
@@ -964,17 +854,15 @@ public class TaskExecutorTest extends TestLogger {
                final String jobManagerAddress = "jm";
                final UUID jobManagerLeaderId = UUID.randomUUID();
 
-               final LeaderRetrievalService 
resourceManagerLeaderRetrievalService = new 
TestingLeaderRetrievalService(resourceManagerAddress, resourceManagerLeaderId);
-               final LeaderRetrievalService jobManagerLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobManagerAddress, jobManagerLeaderId);
-               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
-               haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetrievalService);
+               
resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, 
resourceManagerLeaderId);
+               jobManagerLeaderRetriever.notifyListener(jobManagerAddress, 
jobManagerLeaderId);
 
                final ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
                final InstanceID registrationId = new InstanceID();
 
                when(resourceManagerGateway.registerTaskExecutor(
                        any(String.class),
-                       eq(resourceId),
+                       eq(taskManagerLocation.getResourceID()),
                        any(SlotReport.class),
                        anyInt(),
                        any(HardwareDescription.class),
@@ -1026,7 +914,7 @@ public class TaskExecutorTest extends TestLogger {
                        // wait for the registration at the ResourceManager
                        verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
                                eq(taskManager.getAddress()),
-                               eq(resourceId),
+                               eq(taskManagerLocation.getResourceID()),
                                any(SlotReport.class),
                                anyInt(),
                                any(HardwareDescription.class),
@@ -1041,15 +929,12 @@ public class TaskExecutorTest extends TestLogger {
 
                        verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).notifySlotAvailable(
                                eq(registrationId),
-                               eq(new SlotID(resourceId, 1)),
+                               eq(new 
SlotID(taskManagerLocation.getResourceID(), 1)),
                                eq(allocationId2));
 
                        assertTrue(taskSlotTable.existsActiveSlot(jobId, 
allocationId1));
                        assertFalse(taskSlotTable.existsActiveSlot(jobId, 
allocationId2));
                        assertTrue(taskSlotTable.isSlotFree(1));
-
-                       // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowError();
                } finally {
                        taskManager.shutDown();
                        
taskManager.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
@@ -1057,143 +942,13 @@ public class TaskExecutorTest extends TestLogger {
        }
 
        /**
-        * Tests that all allocation requests for slots are ignored if the slot 
has been reported as
-        * free by the TaskExecutor but this report hasn't been confirmed by 
the ResourceManager.
-        *
-        * This is essential for the correctness of the state of the 
ResourceManager.
-        */
-       @Ignore
-       @Test
-       public void testRejectAllocationRequestsForOutOfSyncSlots() throws 
Exception {
-               final ResourceID resourceID = ResourceID.generate();
-
-               final String address1 = "/resource/manager/address/one";
-               final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final JobID jobId = new JobID();
-               final String jobManagerAddress = "foobar";
-
-               // register the mock resource manager gateways
-               ResourceManagerGateway rmGateway1 = 
mock(ResourceManagerGateway.class);
-               rpc.registerGateway(address1, rmGateway1);
-
-               TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService(
-                       address1,
-                       HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-               TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-               haServices.setResourceManagerLeaderRetriever(testLeaderService);
-
-               TaskManagerConfiguration taskManagerServicesConfiguration = 
mock(TaskManagerConfiguration.class);
-               
when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
-
-               TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
-               
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
-
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
-               final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
-               
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(new 
SlotReport());
-               when(taskSlotTable.getCurrentAllocation(1)).thenReturn(new 
AllocationID());
-               when(rmGateway1.registerTaskExecutor(anyString(), 
eq(resourceID), any(SlotReport.class), anyInt(), 
any(HardwareDescription.class), any(Time.class))).thenReturn(
-               CompletableFuture.completedFuture(new 
TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 1000L, 
new ClusterInformation("localhost", 1234))));
-
-               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
-                       .setTaskManagerLocation(taskManagerLocation)
-                       .setTaskSlotTable(taskSlotTable)
-                       .build();
-
-               TaskExecutor taskManager = new TaskExecutor(
-                       rpc,
-                       taskManagerServicesConfiguration,
-                       haServices,
-                       taskManagerServices,
-                       new HeartbeatServices(1000L, 1000L),
-                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       dummyBlobCacheService,
-                       testingFatalErrorHandler);
-
-               try {
-                       taskManager.start();
-
-                       final TaskExecutorGateway tmGateway = 
taskManager.getSelfGateway(TaskExecutorGateway.class);
-
-                       String taskManagerAddress = tmGateway.getAddress();
-
-                       // no connection initially, since there is no leader
-                       assertNull(taskManager.getResourceManagerConnection());
-
-                       // define a leader and see that a registration happens
-                       testLeaderService.notifyListener(address1, 
resourceManagerId.toUUID());
-
-                       verify(rmGateway1, 
Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-                               eq(taskManagerAddress), eq(resourceID), 
any(SlotReport.class), anyInt(), any(HardwareDescription.class), 
any(Time.class));
-                       
assertNotNull(taskManager.getResourceManagerConnection());
-
-                       // test that allocating a slot works
-                       final SlotID slotID = new SlotID(resourceID, 0);
-                       tmGateway.requestSlot(slotID, jobId, new 
AllocationID(), jobManagerAddress, resourceManagerId, timeout);
-
-                       // TODO: Figure out the concrete allocation behaviour 
between RM and TM. Maybe we don't need the SlotID...
-                       // test that we can't allocate slots which are 
blacklisted due to pending confirmation of the RM
-                       final SlotID unconfirmedFreeSlotID = new 
SlotID(resourceID, 1);
-
-                       CompletableFuture<Acknowledge> requestSlotFuture = 
tmGateway.requestSlot(
-                               unconfirmedFreeSlotID,
-                               jobId,
-                               new AllocationID(),
-                               jobManagerAddress,
-                               resourceManagerId,
-                               timeout);
-
-                       try {
-                               requestSlotFuture.get();
-
-                               fail("The slot request should have failed.");
-                       } catch (Exception e) {
-                               assertTrue(ExceptionUtils.findThrowable(e, 
SlotAllocationException.class).isPresent());
-                       }
-
-                       // re-register
-                       verify(rmGateway1, 
Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-                               eq(taskManagerAddress), eq(resourceID), 
any(SlotReport.class), anyInt(), any(HardwareDescription.class), 
any(Time.class));
-                       testLeaderService.notifyListener(address1, 
resourceManagerId.toUUID());
-
-                       // now we should be successful because the slots status 
has been synced
-                       // test that we can't allocate slots which are 
blacklisted due to pending confirmation of the RM
-                       tmGateway.requestSlot(
-                               unconfirmedFreeSlotID,
-                               jobId,
-                               new AllocationID(),
-                               jobManagerAddress,
-                               resourceManagerId,
-                               timeout);
-
-                       // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowError();
-               }
-               finally {
-                       taskManager.shutDown();
-                       
taskManager.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
-               }
-
-       }
-
-       /**
         * This tests task executor receive SubmitTask before OfferSlot 
response.
         */
        @Test
        public void testSubmitTaskBeforeAcceptSlot() throws Exception {
-               final JobID jobId = new JobID();
-
-               final Configuration configuration = new Configuration();
-               final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
-               final ResourceID resourceId = new ResourceID("foobar");
-               final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
-               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-               final TimerService<AllocationID> timerService = 
mock(TimerService.class);
                final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), 
mock(ResourceProfile.class)), timerService);
                final JobManagerTable jobManagerTable = new JobManagerTable();
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
                final String resourceManagerAddress = "rm";
                final UUID resourceManagerLeaderId = UUID.randomUUID();
@@ -1202,17 +957,15 @@ public class TaskExecutorTest extends TestLogger {
                final String jobManagerAddress = "jm";
                final JobMasterId jobMasterId = JobMasterId.generate();
 
-               final LeaderRetrievalService 
resourceManagerLeaderRetrievalService = new 
TestingLeaderRetrievalService(resourceManagerAddress, resourceManagerLeaderId);
-               final LeaderRetrievalService jobManagerLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobManagerAddress, jobMasterId.toUUID());
-               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
-               haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetrievalService);
+               
resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, 
resourceManagerLeaderId);
+               jobManagerLeaderRetriever.notifyListener(jobManagerAddress, 
jobMasterId.toUUID());
 
                final ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
                final InstanceID registrationId = new InstanceID();
 
                when(resourceManagerGateway.registerTaskExecutor(
                        any(String.class),
-                       eq(resourceId),
+                       eq(taskManagerLocation.getResourceID()),
                        any(SlotReport.class),
                        anyInt(),
                        any(HardwareDescription.class),
@@ -1348,15 +1101,12 @@ public class TaskExecutorTest extends TestLogger {
 
                        verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).notifySlotAvailable(
                                eq(registrationId),
-                               eq(new SlotID(resourceId, 1)),
+                               eq(new 
SlotID(taskManagerLocation.getResourceID(), 1)),
                                any(AllocationID.class));
 
                        assertTrue(taskSlotTable.existsActiveSlot(jobId, 
allocationId1));
                        assertFalse(taskSlotTable.existsActiveSlot(jobId, 
allocationId2));
                        assertTrue(taskSlotTable.isSlotFree(1));
-
-                       // check if a concurrent error occurred
-                       testingFatalErrorHandler.rethrowError();
                } finally {
                        taskManager.shutDown();
                        
taskManager.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
@@ -1372,16 +1122,9 @@ public class TaskExecutorTest extends TestLogger {
        @Test
        public void testFilterOutDuplicateJobMasterRegistrations() throws 
Exception {
                final long verificationTimeout = 500L;
-               final Configuration configuration = new Configuration();
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
                final JobLeaderService jobLeaderService = 
mock(JobLeaderService.class);
-               final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
-               final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(ResourceID.generate(), InetAddress.getLocalHost(), 1234);
-
-               final HighAvailabilityServices haServicesMock = 
mock(HighAvailabilityServices.class, Mockito.RETURNS_MOCKS);
                final HeartbeatServices heartbeatServicesMock = 
mock(HeartbeatServices.class, Mockito.RETURNS_MOCKS);
 
-               final JobID jobId = new JobID();
                final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
                when(jobMasterGateway.getHostname()).thenReturn("localhost");
                final JMTMRegistrationSuccess registrationMessage = new 
JMTMRegistrationSuccess(ResourceID.generate());
@@ -1396,7 +1139,7 @@ public class TaskExecutorTest extends TestLogger {
                final TaskExecutor taskExecutor = new TaskExecutor(
                        rpc,
                        taskManagerConfiguration,
-                       haServicesMock,
+                       haServices,
                        taskManagerServices,
                        heartbeatServicesMock,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
@@ -1424,8 +1167,6 @@ public class TaskExecutorTest extends TestLogger {
                        JobManagerConnection jobManagerConnection = 
jobManagerConnectionArgumentCaptor.getValue();
 
                        assertEquals(jobMasterGateway, 
jobManagerConnection.getJobManagerGateway());
-
-                       testingFatalErrorHandler.rethrowError();
                } finally {
                        taskExecutor.shutDown();
                        
taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
@@ -1442,18 +1183,11 @@ public class TaskExecutorTest extends TestLogger {
                final long heartbeatInterval = 1L;
                final long heartbeatTimeout = 10000L;
                final long pollTimeout = 1000L;
-               final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(new Configuration());
-               final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
                final RecordingHeartbeatServices heartbeatServices = new 
RecordingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
                final ResourceID rmResourceID = ResourceID.generate();
 
                final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
 
-               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService();
-               
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
-
                final String rmAddress = "rm";
                final TestingResourceManagerGateway rmGateway = new 
TestingResourceManagerGateway(
                        ResourceManagerId.generate(),
@@ -1485,7 +1219,7 @@ public class TaskExecutorTest extends TestLogger {
                        final BlockingQueue<ResourceID> unmonitoredTargets = 
heartbeatServices.getUnmonitoredTargets();
                        final BlockingQueue<ResourceID> monitoredTargets = 
heartbeatServices.getMonitoredTargets();
 
-                       rmLeaderRetrievalService.notifyListener(rmAddress, 
rmGateway.getFencingToken().toUUID());
+                       
resourceManagerLeaderRetriever.notifyListener(rmAddress, 
rmGateway.getFencingToken().toUUID());
 
                        // wait for TM registration by checking the registered 
heartbeat targets
                        assertThat(
@@ -1493,12 +1227,10 @@ public class TaskExecutorTest extends TestLogger {
                                equalTo(rmResourceID));
 
                        // let RM lose leadership
-                       rmLeaderRetrievalService.notifyListener(null, null);
+                       resourceManagerLeaderRetriever.notifyListener(null, 
null);
 
                        // the timeout should not have triggered since it is 
much higher
                        assertThat(unmonitoredTargets.poll(pollTimeout, 
TimeUnit.MILLISECONDS), equalTo(rmResourceID));
-
-                       testingFatalErrorHandler.rethrowError();
                } finally {
                        RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
                }
@@ -1512,24 +1244,13 @@ public class TaskExecutorTest extends TestLogger {
         */
        @Test
        public void testRemoveJobFromJobLeaderService() throws Exception {
-               final Configuration configuration = new Configuration();
-               final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
-               final LocalTaskManagerLocation localTaskManagerLocation = new 
LocalTaskManagerLocation();
-               final JobLeaderService jobLeaderService = new 
JobLeaderService(localTaskManagerLocation);
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
                final TaskSlotTable taskSlotTable = new TaskSlotTable(
                        Collections.singleton(ResourceProfile.UNKNOWN),
                        timerService);
 
-               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-
-               final TestingLeaderRetrievalService 
resourceManagerLeaderRetriever = new TestingLeaderRetrievalService();
-               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
-
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
-                       .setTaskManagerLocation(localTaskManagerLocation)
+                       .setTaskManagerLocation(taskManagerLocation)
                        .setTaskSlotTable(taskSlotTable)
-                       .setJobLeaderService(jobLeaderService)
                        .build();
 
                final TaskExecutor taskExecutor = new TaskExecutor(
@@ -1549,8 +1270,6 @@ public class TaskExecutorTest extends TestLogger {
                        
rpc.registerGateway(resourceManagerGateway.getAddress(), 
resourceManagerGateway);
                        
resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(),
 resourceManagerId.toUUID());
 
-                       final JobID jobId = new JobID();
-
                        final CompletableFuture<LeaderRetrievalListener> 
startFuture = new CompletableFuture<>();
                        final CompletableFuture<Void> stopFuture = new 
CompletableFuture<>();
 
@@ -1563,10 +1282,11 @@ public class TaskExecutorTest extends TestLogger {
 
                        final TaskExecutorGateway taskExecutorGateway = 
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
 
-                       final SlotID slotId = new 
SlotID(localTaskManagerLocation.getResourceID(), 0);
+                       final SlotID slotId = new 
SlotID(taskManagerLocation.getResourceID(), 0);
                        final AllocationID allocationId = new AllocationID();
 
                        assertThat(startFuture.isDone(), is(false));
+                       final JobLeaderService jobLeaderService = 
taskManagerServices.getJobLeaderService();
                        assertThat(jobLeaderService.containsJob(jobId), 
is(false));
 
                        taskExecutorGateway.requestSlot(
@@ -1586,8 +1306,6 @@ public class TaskExecutorTest extends TestLogger {
                        // wait that the job leader retrieval service for jobId 
stopped becaue it should get removed
                        stopFuture.get();
                        assertThat(jobLeaderService.containsJob(jobId), 
is(false));
-
-                       testingFatalErrorHandler.rethrowError();
                } finally {
                        RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
                }

Reply via email to