[FLINK-4958] [tm] Send slot report to RM when registering Fix failing test cases
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87341001 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87341001 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87341001 Branch: refs/heads/master Commit: 873410010df4be494f3573c4adfc2cbbc3ad5d0b Parents: 5776235 Author: Till Rohrmann <[email protected]> Authored: Fri Oct 28 15:04:00 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Dec 23 20:54:25 2016 +0100 ---------------------------------------------------------------------- .../runtime/taskexecutor/TaskExecutor.java | 16 +- ...TaskExecutorToResourceManagerConnection.java | 48 +++-- .../taskexecutor/slot/TaskSlotTable.java | 31 ++++ .../taskexecutor/TaskExecutorITCase.java | 183 +++++++++++++++++++ .../runtime/taskexecutor/TaskExecutorTest.java | 16 +- 5 files changed, 277 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index c94113c..f11cb98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -626,10 +626,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { resourceManagerConnection = new TaskExecutorToResourceManagerConnection( log, - this, + getRpcService(), + getAddress(), + getResourceID(), + taskSlotTable.createSlotReport(getResourceID()), newLeaderAddress, newLeaderId, - getMainThreadExecutor()); + getMainThreadExecutor(), + new ForwardingFatalErrorHandler()); resourceManagerConnection.start(); } } @@ -1054,6 +1058,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } } + private final class ForwardingFatalErrorHandler implements FatalErrorHandler { + + @Override + public void onFatalError(Throwable exception) { + onFatalErrorAsync(exception); + } + } + private final class TaskManagerActionsImpl implements TaskManagerActions { private final UUID jobMasterLeaderId; private final JobMasterGateway jobMasterGateway; http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java index 53f030e..6e3e39b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -22,12 +22,14 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.registration.RegisteredRpcConnection; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.registration.RetryingRegistration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import java.util.UUID; @@ -41,29 +43,49 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class TaskExecutorToResourceManagerConnection extends RegisteredRpcConnection<ResourceManagerGateway, TaskExecutorRegistrationSuccess> { - /** the TaskExecutor whose connection to the ResourceManager this represents */ - private final TaskExecutor taskExecutor; + private final RpcService rpcService; + + private final String taskManagerAddress; + + private final ResourceID taskManagerResourceId; + + private final SlotReport slotReport; + + private final FatalErrorHandler fatalErrorHandler; private InstanceID registrationId; public TaskExecutorToResourceManagerConnection( Logger log, - TaskExecutor taskExecutor, + RpcService rpcService, + String taskManagerAddress, + ResourceID taskManagerResourceId, + SlotReport slotReport, String resourceManagerAddress, UUID resourceManagerLeaderId, - Executor executor) { + Executor executor, + FatalErrorHandler fatalErrorHandler) { super(log, resourceManagerAddress, resourceManagerLeaderId, executor); - this.taskExecutor = checkNotNull(taskExecutor); + + this.rpcService = Preconditions.checkNotNull(rpcService); + this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress); + this.taskManagerResourceId = Preconditions.checkNotNull(taskManagerResourceId); + this.slotReport = Preconditions.checkNotNull(slotReport); + this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); } @Override protected RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> generateRegistration() { return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration( - log, taskExecutor.getRpcService(), - getTargetAddress(), getTargetLeaderId(), - taskExecutor.getAddress(),taskExecutor.getResourceID()); + log, + rpcService, + getTargetAddress(), + getTargetLeaderId(), + taskManagerAddress, + taskManagerResourceId, + slotReport); } @Override @@ -78,7 +100,7 @@ public class TaskExecutorToResourceManagerConnection protected void onRegistrationFailure(Throwable failure) { log.info("Failed to register at resource manager {}.", getTargetAddress(), failure); - taskExecutor.onFatalErrorAsync(failure); + fatalErrorHandler.onFatalError(failure); } /** @@ -100,17 +122,21 @@ public class TaskExecutorToResourceManagerConnection private final ResourceID resourceID; + private final SlotReport slotReport; + ResourceManagerRegistration( Logger log, RpcService rpcService, String targetAddress, UUID leaderId, String taskExecutorAddress, - ResourceID resourceID) { + ResourceID resourceID, + SlotReport slotReport) { super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId); this.taskExecutorAddress = checkNotNull(taskExecutorAddress); this.resourceID = checkNotNull(resourceID); + this.slotReport = checkNotNull(slotReport); } @Override @@ -118,7 +144,7 @@ public class TaskExecutorToResourceManagerConnection ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception { Time timeout = Time.milliseconds(timeoutMillis); - return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, new SlotReport(), timeout); + return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index 88b83a0..081d8f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -21,8 +21,12 @@ package org.apache.flink.runtime.taskexecutor.slot; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -127,6 +131,33 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { } // --------------------------------------------------------------------- + // Slot report methods + // --------------------------------------------------------------------- + + public SlotReport createSlotReport(ResourceID resourceId) { + final int numberSlots = taskSlots.size(); + + List<SlotStatus> slotStatuses = Arrays.asList(new SlotStatus[numberSlots]); + + for (int i = 0; i < numberSlots; i++) { + TaskSlot taskSlot = taskSlots.get(i); + SlotID slotId = new SlotID(resourceId, taskSlot.getIndex()); + + SlotStatus slotStatus = new SlotStatus( + slotId, + taskSlot.getResourceProfile(), + taskSlot.getJobId(), + taskSlot.getAllocationId()); + + slotStatuses.set(i, slotStatus); + } + + final SlotReport slotReport = new SlotReport(slotStatuses); + + return slotReport; + } + + // --------------------------------------------------------------------- // Slot methods // --------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java new file mode 100644 index 0000000..050db44 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; +import org.apache.flink.runtime.taskexecutor.slot.TimerService; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.net.InetAddress; +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TaskExecutorITCase { + + @Test + public void testSlotAllocation() throws Exception { + TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + TestingHighAvailabilityServices testingHAServices = new TestingHighAvailabilityServices(); + final Configuration configuration = new Configuration(); + final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + final ResourceID taskManagerResourceId = new ResourceID("foobar"); + final UUID rmLeaderId = UUID.randomUUID(); + final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); + final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(); + final String rmAddress = "rm"; + final String jmAddress = "jm"; + final UUID jmLeaderId = UUID.randomUUID(); + final JobID jobId = new JobID(); + final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1L); + + testingHAServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); + testingHAServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); + testingHAServices.setJobMasterLeaderRetriever(jobId, new TestingLeaderRetrievalService(jmAddress, jmLeaderId)); + + TestingSerialRpcService rpcService = new TestingSerialRpcService(); + ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.milliseconds(500L), Time.milliseconds(500L)); + SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); + JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHAServices); + MetricRegistry metricRegistry = mock(MetricRegistry.class); + + final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); + final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234); + final MemoryManager memoryManager = mock(MemoryManager.class); + final IOManager ioManager = mock(IOManager.class); + final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); + final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class); + final BroadcastVariableManager broadcastVariableManager = mock(BroadcastVariableManager.class); + final FileCache fileCache = mock(FileCache.class); + final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(resourceProfile), new TimerService<AllocationID>(scheduledExecutorService)); + final JobManagerTable jobManagerTable = new JobManagerTable(); + final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); + + ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager( + rpcService, + resourceManagerConfiguration, + testingHAServices, + slotManagerFactory, + metricRegistry, + jobLeaderIdService, + testingFatalErrorHandler); + + TaskExecutor taskExecutor = new TaskExecutor( + taskManagerConfiguration, + taskManagerLocation, + rpcService, + memoryManager, + ioManager, + networkEnvironment, + testingHAServices, + metricRegistry, + taskManagerMetricGroup, + broadcastVariableManager, + fileCache, + taskSlotTable, + jobManagerTable, + jobLeaderService, + testingFatalErrorHandler); + + JobMasterGateway jmGateway = mock(JobMasterGateway.class); + + when(jmGateway.registerTaskManager(any(String.class), any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class))) + .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(taskManagerResourceId, 1234))); + when(jmGateway.getAddress()).thenReturn(jmAddress); + + + rpcService.registerGateway(rmAddress, resourceManager.getSelf()); + rpcService.registerGateway(jmAddress, jmGateway); + + final AllocationID allocationId = new AllocationID(); + final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile); + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, resourceProfile); + + try { + resourceManager.start(); + taskExecutor.start(); + + // notify the RM that it is the leader + rmLeaderElectionService.isLeader(rmLeaderId); + + // notify the TM about the new RM leader + rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId); + + Future<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager(rmLeaderId, jmLeaderId, jmAddress, jobId); + + RegistrationResponse registrationResponse = registrationResponseFuture.get(); + + assertTrue(registrationResponse instanceof JobMasterRegistrationSuccess); + + resourceManager.requestSlot(jmLeaderId, rmLeaderId, slotRequest); + + verify(jmGateway).offerSlots( + eq(taskManagerResourceId), + (Iterable<SlotOffer>)argThat(Matchers.contains(slotOffer)), + eq(jmLeaderId), any(Time.class)); + } finally { + if (testingFatalErrorHandler.hasExceptionOccurred()) { + testingFatalErrorHandler.rethrowError(); + } + } + + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 1ef7140..2af97b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -119,6 +119,10 @@ public class TaskExecutorTest extends TestLogger { NonHaServices haServices = new NonHaServices(resourceManagerAddress); + final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class); + final SlotReport slotReport = new SlotReport(); + when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport); + TaskExecutor taskManager = new TaskExecutor( taskManagerServicesConfiguration, taskManagerLocation, @@ -131,7 +135,7 @@ public class TaskExecutorTest extends TestLogger { mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), - mock(TaskSlotTable.class), + taskSlotTable, mock(JobManagerTable.class), mock(JobLeaderService.class), mock(FatalErrorHandler.class)); @@ -140,7 +144,7 @@ public class TaskExecutorTest extends TestLogger { String taskManagerAddress = taskManager.getAddress(); verify(rmGateway).registerTaskExecutor( - any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class)); + any(UUID.class), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class)); } finally { rpc.stopService(); @@ -178,6 +182,10 @@ public class TaskExecutorTest extends TestLogger { when(taskManagerLocation.getResourceID()).thenReturn(resourceID); when(taskManagerLocation.getHostname()).thenReturn("foobar"); + final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class); + final SlotReport slotReport = new SlotReport(); + when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport); + TaskExecutor taskManager = new TaskExecutor( taskManagerServicesConfiguration, taskManagerLocation, @@ -190,7 +198,7 @@ public class TaskExecutorTest extends TestLogger { mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), - mock(TaskSlotTable.class), + taskSlotTable, mock(JobManagerTable.class), mock(JobLeaderService.class), mock(FatalErrorHandler.class)); @@ -215,7 +223,7 @@ public class TaskExecutorTest extends TestLogger { testLeaderService.notifyListener(address2, leaderId2); verify(rmGateway2).registerTaskExecutor( - eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class)); + eq(leaderId2), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class)); assertNotNull(taskManager.getResourceManagerConnection()); } finally {
