[FLINK-4839] [cluster management] JobManager handle TaskManager's slot offering
This closes #2647 #2643. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af924b48 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af924b48 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af924b48 Branch: refs/heads/master Commit: af924b489420b8d7163e6216c0efb05e3ab30514 Parents: a7ed9a5 Author: Kurt Young <[email protected]> Authored: Mon Oct 17 18:15:26 2016 +0800 Committer: Stephan Ewen <[email protected]> Committed: Fri Dec 23 20:54:23 2016 +0100 ---------------------------------------------------------------------- .../apache/flink/runtime/instance/SlotPool.java | 46 +++++++++--- .../flink/runtime/jobmaster/JobMaster.java | 50 ++++++++++++- .../runtime/jobmaster/JobMasterGateway.java | 26 +++++-- .../resourcemanager/ResourceManager.java | 2 +- .../runtime/taskexecutor/TaskExecutor.java | 34 +++++---- .../runtime/taskexecutor/slot/SlotOffer.java | 79 ++++++++++++++++++++ .../runtime/taskexecutor/slot/TaskSlot.java | 13 ++++ .../taskexecutor/slot/TaskSlotTable.java | 12 +-- .../runtime/taskexecutor/TaskExecutorTest.java | 16 +++- 9 files changed, 231 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 7e7b21e..44df29b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -267,7 +267,7 @@ public class SlotPool implements SlotOwner { } // ------------------------------------------------------------------------ - // Slot De-allocation + // Slot releasing & offering // ------------------------------------------------------------------------ /** @@ -323,10 +323,6 @@ public class SlotPool implements SlotOwner { return null; } - // ------------------------------------------------------------------------ - // Slot Releasing - // ------------------------------------------------------------------------ - /** * Release slot to TaskManager, called for finished tasks or canceled jobs. * @@ -340,10 +336,6 @@ public class SlotPool implements SlotOwner { } } - // ------------------------------------------------------------------------ - // Slot Offering - // ------------------------------------------------------------------------ - /** * Slot offering by TaskManager with AllocationID. The AllocationID is originally generated by this pool and * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation @@ -401,6 +393,39 @@ public class SlotPool implements SlotOwner { } // ------------------------------------------------------------------------ + // Error Handling + // ------------------------------------------------------------------------ + + /** + * Fail the specified allocation and release the corresponding slot if we have one. + * This may triggered by JobManager when some slot allocation failed with timeout. + * Or this could be triggered by TaskManager, when it finds out something went wrong with the slot, + * and decided to take it back. + * + * @param allocationID Represents the allocation which should be failed + * @param cause The cause of the failure + */ + public void failAllocation(final AllocationID allocationID, final Exception cause) { + synchronized (lock) { + // 1. check whether the allocation still pending + Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>> pendingRequest = + pendingRequests.get(allocationID); + if (pendingRequest != null) { + pendingRequest.f1.completeExceptionally(cause); + return; + } + + // 2. check whether we have a free slot corresponding to this allocation id + // TODO: add allocation id to slot descriptor, so we can remove it by allocation id + + // 3. check whether we have a in-use slot corresponding to this allocation id + // TODO: needs mechanism to release the in-use Slot but don't return it back to this pool + + // TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase + } + } + + // ------------------------------------------------------------------------ // Resource // ------------------------------------------------------------------------ @@ -464,12 +489,13 @@ public class SlotPool implements SlotOwner { */ static class AllocatedSlots { - /** All allocated slots organized by TaskManager */ + /** All allocated slots organized by TaskManager's id */ private final Map<ResourceID, Set<Slot>> allocatedSlotsByResource; /** All allocated slots organized by Slot object */ private final Map<Slot, AllocationID> allocatedSlots; + /** All allocated slot descriptors organized by Slot object */ private final Map<Slot, SlotDescriptor> allocatedSlotsWithDescriptor; /** All allocated slots organized by AllocationID */ http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 7bcfb3a..3c6bbd3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -53,6 +53,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.instance.SlotDescriptor; import org.apache.flink.runtime.instance.SlotPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -85,6 +86,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.SerializedThrowable; @@ -95,7 +97,9 @@ import org.slf4j.Logger; import javax.annotation.Nullable; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Executor; @@ -663,13 +667,51 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } @RpcMethod - public Iterable<AllocationID> offerSlots(final Iterable<AllocationID> slots, UUID leaderId) { - throw new UnsupportedOperationException("Has to be implemented."); + public Iterable<SlotOffer> offerSlots(final ResourceID taskManagerId, + final Iterable<SlotOffer> slots, final UUID leaderId) throws Exception + { + if (!this.leaderSessionID.equals(leaderId)) { + throw new Exception("Leader id not match, expected: " + this.leaderSessionID + + ", actual: " + leaderId); + } + + Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId); + if (taskManager == null) { + throw new Exception("Unknown TaskManager " + taskManagerId); + } + + final Set<SlotOffer> acceptedSlotOffers = new HashSet<>(4); + for (SlotOffer slotOffer : slots) { + final SlotDescriptor slotDescriptor = new SlotDescriptor( + jobGraph.getJobID(), + taskManager.f0, + slotOffer.getSlotIndex(), + slotOffer.getResourceProfile(), + null); // TODO: replace the actor gateway with the new rpc gateway, it's ready (taskManager.f1) + if (slotPool.offerSlot(slotOffer.getAllocationId(), slotDescriptor)) { + acceptedSlotOffers.add(slotOffer); + } + } + + return acceptedSlotOffers; } @RpcMethod - public void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause) { - throw new UnsupportedOperationException("Has to be implemented."); + public void failSlot(final ResourceID taskManagerId, + final AllocationID allocationId, + final UUID leaderId, + final Exception cause) throws Exception + { + if (!this.leaderSessionID.equals(leaderId)) { + throw new Exception("Leader id not match, expected: " + this.leaderSessionID + + ", actual: " + leaderId); + } + + if (!registeredTaskManagers.containsKey(taskManagerId)) { + throw new Exception("Unknown TaskManager " + taskManagerId); + } + + slotPool.failAllocation(allocationId, cause); } @RpcMethod http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 8925d94..2d7ebb9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -166,21 +167,30 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { /** * Offer the given slots to the job manager. The response contains the set of accepted slots. * - * @param slots to offer to the job manager - * @param leaderId identifying the job leader - * @param timeout for the rpc call + * @param taskManagerId identifying the task manager + * @param slots to offer to the job manager + * @param leaderId identifying the job leader + * @param timeout for the rpc call * @return Future set of accepted slots. */ - Future<Iterable<AllocationID>> offerSlots(final Iterable<AllocationID> slots, UUID leaderId, @RpcTimeout final Time timeout); + Future<Iterable<SlotOffer>> offerSlots( + final ResourceID taskManagerId, + final Iterable<SlotOffer> slots, + final UUID leaderId, + @RpcTimeout final Time timeout); /** * Fail the slot with the given allocation id and cause. * - * @param allocationId identifying the slot to fail - * @param leaderId identifying the job leader - * @param cause of the failing + * @param taskManagerId identifying the task manager + * @param allocationId identifying the slot to fail + * @param leaderId identifying the job leader + * @param cause of the failing */ - void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause); + void failSlot(final ResourceID taskManagerId, + final AllocationID allocationId, + final UUID leaderId, + final Exception cause); /** * Register the task manager at the job manager. http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 3122804..f1a5073 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -69,7 +69,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * ResourceManager implementation. The resource manager is responsible for resource de-/allocation * and bookkeeping. * - * It offers the following methods as part of its rpc interface to interact with the him remotely: + * It offers the following methods as part of its rpc interface to interact with him remotely: * <ul> * <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li> * <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li> http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 5146e5b..679324b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -72,6 +72,8 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNot import org.apache.flink.runtime.taskexecutor.slot.SlotActions; import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException; import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskexecutor.slot.TaskSlot; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; @@ -660,47 +662,49 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway(); - final Iterator<AllocationID> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId); + final Iterator<TaskSlot> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId); final UUID leaderId = jobManagerConnection.getLeaderId(); - final Collection<AllocationID> reservedSlots = new HashSet<>(2); + final Collection<SlotOffer> reservedSlots = new HashSet<>(2); while (reservedSlotsIterator.hasNext()) { - reservedSlots.add(reservedSlotsIterator.next()); + reservedSlots.add(reservedSlotsIterator.next().generateSlotOffer()); } - Future<Iterable<AllocationID>> acceptedSlotsFuture = jobMasterGateway.offerSlots( + Future<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots( + getResourceID(), reservedSlots, leaderId, taskManagerConfiguration.getTimeout()); - acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<AllocationID>>() { + acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<SlotOffer>>() { @Override - public void accept(Iterable<AllocationID> acceptedSlots) { + public void accept(Iterable<SlotOffer> acceptedSlots) { // check if the response is still valid if (isJobManagerConnectionValid(jobId, leaderId)) { // mark accepted slots active - for (AllocationID acceptedSlot: acceptedSlots) { + for (SlotOffer acceptedSlot: acceptedSlots) { try { - if (!taskSlotTable.markSlotActive(acceptedSlot)) { + if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) { // the slot is either free or releasing at the moment final String message = "Could not mark slot " + jobId + " active."; log.debug(message); - jobMasterGateway.failSlot(acceptedSlot, leaderId, new Exception(message)); + jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), + leaderId, new Exception(message)); } // remove the assigned slots so that we can free the left overs reservedSlots.remove(acceptedSlot); } catch (SlotNotFoundException e) { log.debug("Could not mark slot {} active.", acceptedSlot, e); - jobMasterGateway.failSlot(acceptedSlot, leaderId, e); + jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), leaderId, e); } } final Exception e = new Exception("The slot was rejected by the JobManager."); - for (AllocationID rejectedSlot: reservedSlots) { - freeSlot(rejectedSlot, e); + for (SlotOffer rejectedSlot: reservedSlots) { + freeSlot(rejectedSlot.getAllocationId(), e); } } else { // discard the response since there is a new leader for the job @@ -718,8 +722,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { offerSlotsToJobManager(jobId); } else { // We encountered an exception. Free the slots and return them to the RM. - for (AllocationID reservedSlot: reservedSlots) { - freeSlot(reservedSlot, throwable); + for (SlotOffer reservedSlot: reservedSlots) { + freeSlot(reservedSlot.getAllocationId(), throwable); } } @@ -870,7 +874,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { private void unregisterTaskAndNotifyFinalState( final UUID jobMasterLeaderId, - final JobMasterGateway jobMasterGateway, + final JobMasterGateway jobMasterGateway, final ExecutionAttemptID executionAttemptID) { Task task = taskSlotTable.removeTask(executionAttemptID); http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java new file mode 100644 index 0000000..f8d7e6c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.slot; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * Describe the slot offering to job manager provided by task manager. + */ +public class SlotOffer implements Serializable { + + private static final long serialVersionUID = -7067814231108250971L; + + /** Allocation id of this slot, this would be the only identifier for this slot offer */ + private AllocationID allocationId; + + /** Index of the offered slot */ + private final int slotIndex; + + /** The resource profile of the offered slot */ + private final ResourceProfile resourceProfile; + + public SlotOffer(final AllocationID allocationID, final int index, final ResourceProfile resourceProfile) { + Preconditions.checkArgument(0 <= index, "The index must be greater than 0."); + this.allocationId = Preconditions.checkNotNull(allocationID); + this.slotIndex = index; + this.resourceProfile = Preconditions.checkNotNull(resourceProfile); + } + + public AllocationID getAllocationId() { + return allocationId; + } + + public int getSlotIndex() { + return slotIndex; + } + + public ResourceProfile getResourceProfile() { + return resourceProfile; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SlotOffer slotOffer = (SlotOffer) o; + return allocationId.equals(slotOffer.allocationId); + } + + @Override + public int hashCode() { + return allocationId.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java index 0942772..e12c15b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java @@ -286,4 +286,17 @@ public class TaskSlot { state = TaskSlotState.RELEASING; return true; } + + /** + * Generate the slot offer from this TaskSlot. + * + * @return The sot offer which this task slot can provide + */ + public SlotOffer generateSlotOffer() { + Preconditions.checkState(TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state, + "The task slot is not in state active or allocated."); + Preconditions.checkState(allocationId != null, "The task slot are not allocated"); + + return new SlotOffer(allocationId, index, resourceProfile); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index 88123b4..88b83a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -70,7 +70,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { /** Interface for slot actions, such as freeing them or timing them out */ private SlotActions slotActions; - + /** Whether the table has been started */ private boolean started; @@ -250,7 +250,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { */ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException { checkInit(); - + if (LOG.isDebugEnabled()) { LOG.debug("Free slot {}.", allocationId, cause); } else { @@ -370,13 +370,13 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { } /** - * Return an iterator of allocated slots (their allocation ids) for the given job id. + * Return an iterator of allocated slots for the given job id. * * @param jobId for which to return the allocated slots - * @return Iterator of allocation ids of allocated slots. + * @return Iterator of allocated slots. */ - public Iterator<AllocationID> getAllocatedSlots(JobID jobId) { - return new AllocationIDIterator(jobId, TaskSlotState.ALLOCATED); + public Iterator<TaskSlot> getAllocatedSlots(JobID jobId) { + return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/af924b48/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 55cc142..4d73a4b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -65,6 +65,7 @@ import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequ import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -406,6 +407,7 @@ public class TaskExecutorTest extends TestLogger { final AllocationID allocationId = new AllocationID(); final SlotID slotId = new SlotID(resourceId, 0); + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); try { TaskExecutor taskManager = new TaskExecutor( @@ -440,7 +442,11 @@ public class TaskExecutorTest extends TestLogger { jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, jobManagerLeaderId); // the job leader should get the allocation id offered - verify(jobMasterGateway).offerSlots((Iterable<AllocationID>)Matchers.argThat(contains(allocationId)), eq(jobManagerLeaderId), any(Time.class)); + verify(jobMasterGateway).offerSlots( + any(ResourceID.class), + (Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)), + eq(jobManagerLeaderId), + any(Time.class)); } finally { // check if a concurrent error occurred testingFatalErrorHandler.rethrowException(); @@ -496,6 +502,9 @@ public class TaskExecutorTest extends TestLogger { final AllocationID allocationId1 = new AllocationID(); final AllocationID allocationId2 = new AllocationID(); + final SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN); + final SlotOffer offer2 = new SlotOffer(allocationId2, 0, ResourceProfile.UNKNOWN); + final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); when(jobMasterGateway.registerTaskManager( @@ -506,8 +515,9 @@ public class TaskExecutorTest extends TestLogger { )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress); - when(jobMasterGateway.offerSlots(any(Iterable.class), eq(jobManagerLeaderId), any(Time.class))) - .thenReturn(FlinkCompletableFuture.completed((Iterable<AllocationID>)Collections.singleton(allocationId1))); + when(jobMasterGateway.offerSlots( + any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class))) + .thenReturn(FlinkCompletableFuture.completed((Iterable<SlotOffer>)Collections.singleton(offer1))); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); rpc.registerGateway(jobManagerAddress, jobMasterGateway);
