[FLINK-4958] [tm] Send slot report to RM when registering

Fix failing test cases


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87341001
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87341001
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87341001

Branch: refs/heads/master
Commit: 873410010df4be494f3573c4adfc2cbbc3ad5d0b
Parents: 5776235
Author: Till Rohrmann <[email protected]>
Authored: Fri Oct 28 15:04:00 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Dec 23 20:54:25 2016 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      |  16 +-
 ...TaskExecutorToResourceManagerConnection.java |  48 +++--
 .../taskexecutor/slot/TaskSlotTable.java        |  31 ++++
 .../taskexecutor/TaskExecutorITCase.java        | 183 +++++++++++++++++++
 .../runtime/taskexecutor/TaskExecutorTest.java  |  16 +-
 5 files changed, 277 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index c94113c..f11cb98 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -626,10 +626,14 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        resourceManagerConnection =
                                new TaskExecutorToResourceManagerConnection(
                                        log,
-                                       this,
+                                       getRpcService(),
+                                       getAddress(),
+                                       getResourceID(),
+                                       
taskSlotTable.createSlotReport(getResourceID()),
                                        newLeaderAddress,
                                        newLeaderId,
-                                       getMainThreadExecutor());
+                                       getMainThreadExecutor(),
+                                       new ForwardingFatalErrorHandler());
                        resourceManagerConnection.start();
                }
        }
@@ -1054,6 +1058,14 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
+       private final class ForwardingFatalErrorHandler implements 
FatalErrorHandler {
+
+               @Override
+               public void onFatalError(Throwable exception) {
+                       onFatalErrorAsync(exception);
+               }
+       }
+
        private final class TaskManagerActionsImpl implements 
TaskManagerActions {
                private final UUID jobMasterLeaderId;
                private final JobMasterGateway jobMasterGateway;

http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 53f030e..6e3e39b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -22,12 +22,14 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.concurrent.Future;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import java.util.UUID;
@@ -41,29 +43,49 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class TaskExecutorToResourceManagerConnection
                extends RegisteredRpcConnection<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> {
 
-       /** the TaskExecutor whose connection to the ResourceManager this 
represents */
-       private final TaskExecutor taskExecutor;
+       private final RpcService rpcService;
+
+       private final String taskManagerAddress;
+
+       private final ResourceID taskManagerResourceId;
+
+       private final SlotReport slotReport;
+
+       private final FatalErrorHandler fatalErrorHandler;
 
        private InstanceID registrationId;
 
        public TaskExecutorToResourceManagerConnection(
                        Logger log,
-                       TaskExecutor taskExecutor,
+                       RpcService rpcService,
+                       String taskManagerAddress,
+                       ResourceID taskManagerResourceId,
+                       SlotReport slotReport,
                        String resourceManagerAddress,
                        UUID resourceManagerLeaderId,
-                       Executor executor) {
+                       Executor executor,
+                       FatalErrorHandler fatalErrorHandler) {
 
                super(log, resourceManagerAddress, resourceManagerLeaderId, 
executor);
-               this.taskExecutor = checkNotNull(taskExecutor);
+
+               this.rpcService = Preconditions.checkNotNull(rpcService);
+               this.taskManagerAddress = 
Preconditions.checkNotNull(taskManagerAddress);
+               this.taskManagerResourceId = 
Preconditions.checkNotNull(taskManagerResourceId);
+               this.slotReport = Preconditions.checkNotNull(slotReport);
+               this.fatalErrorHandler = 
Preconditions.checkNotNull(fatalErrorHandler);
        }
 
 
        @Override
        protected RetryingRegistration<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> generateRegistration() {
                return new 
TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
-                       log, taskExecutor.getRpcService(),
-                       getTargetAddress(), getTargetLeaderId(),
-                       taskExecutor.getAddress(),taskExecutor.getResourceID());
+                       log,
+                       rpcService,
+                       getTargetAddress(),
+                       getTargetLeaderId(),
+                       taskManagerAddress,
+                       taskManagerResourceId,
+                       slotReport);
        }
 
        @Override
@@ -78,7 +100,7 @@ public class TaskExecutorToResourceManagerConnection
        protected void onRegistrationFailure(Throwable failure) {
                log.info("Failed to register at resource manager {}.", 
getTargetAddress(), failure);
 
-               taskExecutor.onFatalErrorAsync(failure);
+               fatalErrorHandler.onFatalError(failure);
        }
 
        /**
@@ -100,17 +122,21 @@ public class TaskExecutorToResourceManagerConnection
                
                private final ResourceID resourceID;
 
+               private final SlotReport slotReport;
+
                ResourceManagerRegistration(
                                Logger log,
                                RpcService rpcService,
                                String targetAddress,
                                UUID leaderId,
                                String taskExecutorAddress,
-                               ResourceID resourceID) {
+                               ResourceID resourceID,
+                               SlotReport slotReport) {
 
                        super(log, rpcService, "ResourceManager", 
ResourceManagerGateway.class, targetAddress, leaderId);
                        this.taskExecutorAddress = 
checkNotNull(taskExecutorAddress);
                        this.resourceID = checkNotNull(resourceID);
+                       this.slotReport = checkNotNull(slotReport);
                }
 
                @Override
@@ -118,7 +144,7 @@ public class TaskExecutorToResourceManagerConnection
                                ResourceManagerGateway resourceManager, UUID 
leaderId, long timeoutMillis) throws Exception {
 
                        Time timeout = Time.milliseconds(timeoutMillis);
-                       return resourceManager.registerTaskExecutor(leaderId, 
taskExecutorAddress, resourceID, new SlotReport(), timeout);
+                       return resourceManager.registerTaskExecutor(leaderId, 
taskExecutorAddress, resourceID, slotReport, timeout);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 88b83a0..081d8f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -21,8 +21,12 @@ package org.apache.flink.runtime.taskexecutor.slot;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -127,6 +131,33 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
        }
 
        // ---------------------------------------------------------------------
+       // Slot report methods
+       // ---------------------------------------------------------------------
+
+       public SlotReport createSlotReport(ResourceID resourceId) {
+               final int numberSlots = taskSlots.size();
+
+               List<SlotStatus> slotStatuses = Arrays.asList(new 
SlotStatus[numberSlots]);
+
+               for (int i = 0; i < numberSlots; i++) {
+                       TaskSlot taskSlot = taskSlots.get(i);
+                       SlotID slotId = new SlotID(resourceId, 
taskSlot.getIndex());
+
+                       SlotStatus slotStatus = new SlotStatus(
+                               slotId,
+                               taskSlot.getResourceProfile(),
+                               taskSlot.getJobId(),
+                               taskSlot.getAllocationId());
+
+                       slotStatuses.set(i, slotStatus);
+               }
+
+               final SlotReport slotReport = new SlotReport(slotStatuses);
+
+               return slotReport;
+       }
+
+       // ---------------------------------------------------------------------
        // Slot methods
        // ---------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
new file mode 100644
index 0000000..050db44
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.filecache.FileCache;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TaskExecutorITCase {
+
+       @Test
+       public void testSlotAllocation() throws Exception {
+               TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+               TestingHighAvailabilityServices testingHAServices = new 
TestingHighAvailabilityServices();
+               final Configuration configuration = new Configuration();
+               final ScheduledExecutorService scheduledExecutorService = new 
ScheduledThreadPoolExecutor(1);
+               final ResourceID taskManagerResourceId = new 
ResourceID("foobar");
+               final UUID rmLeaderId = UUID.randomUUID();
+               final TestingLeaderElectionService rmLeaderElectionService = 
new TestingLeaderElectionService();
+               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               final String rmAddress = "rm";
+               final String jmAddress = "jm";
+               final UUID jmLeaderId = UUID.randomUUID();
+               final JobID jobId = new JobID();
+               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1L);
+
+               
testingHAServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+               
testingHAServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+               testingHAServices.setJobMasterLeaderRetriever(jobId, new 
TestingLeaderRetrievalService(jmAddress, jmLeaderId));
+
+               TestingSerialRpcService rpcService = new 
TestingSerialRpcService();
+               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(Time.milliseconds(500L), Time.milliseconds(500L));
+               SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
+               JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(testingHAServices);
+               MetricRegistry metricRegistry = mock(MetricRegistry.class);
+
+               final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
+               final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234);
+               final MemoryManager memoryManager = mock(MemoryManager.class);
+               final IOManager ioManager = mock(IOManager.class);
+               final NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
+               final TaskManagerMetricGroup taskManagerMetricGroup = 
mock(TaskManagerMetricGroup.class);
+               final BroadcastVariableManager broadcastVariableManager = 
mock(BroadcastVariableManager.class);
+               final FileCache fileCache = mock(FileCache.class);
+               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(resourceProfile), new 
TimerService<AllocationID>(scheduledExecutorService));
+               final JobManagerTable jobManagerTable = new JobManagerTable();
+               final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
+
+               ResourceManager<ResourceID> resourceManager = new 
StandaloneResourceManager(
+                       rpcService,
+                       resourceManagerConfiguration,
+                       testingHAServices,
+                       slotManagerFactory,
+                       metricRegistry,
+                       jobLeaderIdService,
+                       testingFatalErrorHandler);
+
+               TaskExecutor taskExecutor = new TaskExecutor(
+                       taskManagerConfiguration,
+                       taskManagerLocation,
+                       rpcService,
+                       memoryManager,
+                       ioManager,
+                       networkEnvironment,
+                       testingHAServices,
+                       metricRegistry,
+                       taskManagerMetricGroup,
+                       broadcastVariableManager,
+                       fileCache,
+                       taskSlotTable,
+                       jobManagerTable,
+                       jobLeaderService,
+                       testingFatalErrorHandler);
+
+               JobMasterGateway jmGateway = mock(JobMasterGateway.class);
+
+               when(jmGateway.registerTaskManager(any(String.class), 
any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class)))
+                       
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(taskManagerResourceId, 1234)));
+               when(jmGateway.getAddress()).thenReturn(jmAddress);
+
+
+               rpcService.registerGateway(rmAddress, 
resourceManager.getSelf());
+               rpcService.registerGateway(jmAddress, jmGateway);
+
+               final AllocationID allocationId = new AllocationID();
+               final SlotRequest slotRequest = new SlotRequest(jobId, 
allocationId, resourceProfile);
+               final SlotOffer slotOffer = new SlotOffer(allocationId, 0, 
resourceProfile);
+
+               try {
+                       resourceManager.start();
+                       taskExecutor.start();
+
+                       // notify the RM that it is the leader
+                       rmLeaderElectionService.isLeader(rmLeaderId);
+
+                       // notify the TM about the new RM leader
+                       rmLeaderRetrievalService.notifyListener(rmAddress, 
rmLeaderId);
+
+                       Future<RegistrationResponse> registrationResponseFuture 
= resourceManager.registerJobManager(rmLeaderId, jmLeaderId, jmAddress, jobId);
+
+                       RegistrationResponse registrationResponse = 
registrationResponseFuture.get();
+
+                       assertTrue(registrationResponse instanceof 
JobMasterRegistrationSuccess);
+
+                       resourceManager.requestSlot(jmLeaderId, rmLeaderId, 
slotRequest);
+
+                       verify(jmGateway).offerSlots(
+                               eq(taskManagerResourceId),
+                               
(Iterable<SlotOffer>)argThat(Matchers.contains(slotOffer)),
+                               eq(jmLeaderId), any(Time.class));
+               } finally {
+                       if (testingFatalErrorHandler.hasExceptionOccurred()) {
+                               testingFatalErrorHandler.rethrowError();
+                       }
+               }
+
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 1ef7140..2af97b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -119,6 +119,10 @@ public class TaskExecutorTest extends TestLogger {
 
                        NonHaServices haServices = new 
NonHaServices(resourceManagerAddress);
 
+                       final TaskSlotTable taskSlotTable = 
mock(TaskSlotTable.class);
+                       final SlotReport slotReport = new SlotReport();
+                       
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+
                        TaskExecutor taskManager = new TaskExecutor(
                                taskManagerServicesConfiguration,
                                taskManagerLocation,
@@ -131,7 +135,7 @@ public class TaskExecutorTest extends TestLogger {
                                mock(TaskManagerMetricGroup.class),
                                mock(BroadcastVariableManager.class),
                                mock(FileCache.class),
-                               mock(TaskSlotTable.class),
+                               taskSlotTable,
                                mock(JobManagerTable.class),
                                mock(JobLeaderService.class),
                                mock(FatalErrorHandler.class));
@@ -140,7 +144,7 @@ public class TaskExecutorTest extends TestLogger {
                        String taskManagerAddress = taskManager.getAddress();
 
                        verify(rmGateway).registerTaskExecutor(
-                                       any(UUID.class), 
eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+                                       any(UUID.class), 
eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
                }
                finally {
                        rpc.stopService();
@@ -178,6 +182,10 @@ public class TaskExecutorTest extends TestLogger {
                        
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
                        
when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
+                       final TaskSlotTable taskSlotTable = 
mock(TaskSlotTable.class);
+                       final SlotReport slotReport = new SlotReport();
+                       
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+
                        TaskExecutor taskManager = new TaskExecutor(
                                taskManagerServicesConfiguration,
                                taskManagerLocation,
@@ -190,7 +198,7 @@ public class TaskExecutorTest extends TestLogger {
                                mock(TaskManagerMetricGroup.class),
                                mock(BroadcastVariableManager.class),
                                mock(FileCache.class),
-                               mock(TaskSlotTable.class),
+                               taskSlotTable,
                                mock(JobManagerTable.class),
                                mock(JobLeaderService.class),
                                mock(FatalErrorHandler.class));
@@ -215,7 +223,7 @@ public class TaskExecutorTest extends TestLogger {
                        testLeaderService.notifyListener(address2, leaderId2);
 
                        verify(rmGateway2).registerTaskExecutor(
-                                       eq(leaderId2), eq(taskManagerAddress), 
eq(resourceID), any(SlotReport.class), any(Time.class));
+                                       eq(leaderId2), eq(taskManagerAddress), 
eq(resourceID), eq(slotReport), any(Time.class));
                        
assertNotNull(taskManager.getResourceManagerConnection());
                }
                finally {

Reply via email to