[FLINK-4987] Harden SlotPool on JobMaster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1ba9f11 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1ba9f11 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1ba9f11 Branch: refs/heads/master Commit: a1ba9f1126270d53168394211a99e354aa2cf20d Parents: 8730e20 Author: Stephan Ewen <[email protected]> Authored: Fri Oct 21 16:51:34 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Dec 23 20:54:25 2016 +0100 ---------------------------------------------------------------------- .../clusterframework/types/ResourceProfile.java | 18 +- .../org/apache/flink/runtime/instance/Slot.java | 16 + .../flink/runtime/instance/SlotDescriptor.java | 162 --- .../apache/flink/runtime/instance/SlotPool.java | 1026 +++++++++++------- .../flink/runtime/instance/SlotPoolGateway.java | 95 ++ .../runtime/jobmanager/scheduler/Locality.java | 26 +- .../scheduler/NoResourceAvailableException.java | 6 +- .../runtime/jobmanager/slots/AllocatedSlot.java | 58 +- .../jobmanager/slots/PooledSlotProvider.java | 73 -- .../jobmanager/slots/SlotAndLocality.java | 55 + .../flink/runtime/jobmaster/JobMaster.java | 102 +- .../apache/flink/runtime/rpc/RpcEndpoint.java | 18 +- .../apache/flink/runtime/util/clock/Clock.java | 40 + .../flink/runtime/util/clock/SystemClock.java | 57 + .../types/ResourceProfileTest.java | 5 + .../runtime/instance/AllocatedSlotsTest.java | 270 ++--- .../runtime/instance/AvailableSlotsTest.java | 247 +++-- .../flink/runtime/instance/SlotPoolTest.java | 596 +++++----- .../runtime/minicluster/MiniClusterITCase.java | 13 +- 19 files changed, 1650 insertions(+), 1233 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java index 7a25de1..ddc7547 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java @@ -18,14 +18,21 @@ package org.apache.flink.runtime.clusterframework.types; +import javax.annotation.Nonnull; import java.io.Serializable; /** * Describe the resource profile of the slot, either when requiring or offering it. The profile can be * checked whether it can match another profile's requirement, and furthermore we may calculate a matching * score to decide which profile we should choose when we have lots of candidate slots. + * + * <p>Resource Profiles have a total ordering, defined by comparing these fields in sequence: + * <ol> + * <li>Memory Size</li> + * <li>CPU cores</li> + * </ol> */ -public class ResourceProfile implements Serializable { +public class ResourceProfile implements Serializable, Comparable<ResourceProfile> { private static final long serialVersionUID = 1L; @@ -90,11 +97,18 @@ public class ResourceProfile implements Serializable { return cpuCores >= required.getCpuCores() && memoryInMB >= required.getMemoryInMB(); } + @Override + public int compareTo(@Nonnull ResourceProfile other) { + int cmp1 = Long.compare(this.memoryInMB, other.memoryInMB); + int cmp2 = Double.compare(this.cpuCores, other.cpuCores); + return (cmp1 != 0) ? cmp1 : cmp2; + } + // ------------------------------------------------------------------------ @Override public int hashCode() { - long cpuBits = Double.doubleToLongBits(cpuCores); + long cpuBits = Double.doubleToRawLongBits(cpuCores); return (int) (cpuBits ^ (cpuBits >>> 32) ^ memoryInMB ^ (memoryInMB >> 32)); } http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java index 8f8b897..d6d8f12 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java @@ -350,6 +350,22 @@ public abstract class Slot { // Utilities // -------------------------------------------------------------------------------------------- + /** + * Slots must always has based on reference identity. + */ + @Override + public final int hashCode() { + return super.hashCode(); + } + + /** + * Slots must always compare on referential equality. + */ + @Override + public final boolean equals(Object obj) { + return this == obj; + } + @Override public String toString() { return hierarchy() + " - " + getTaskManagerLocation() + " - " + getStateName(status); http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java deleted file mode 100644 index 47ce422..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * The description of slots, TaskManagers offer one or more task slots, which define a slice of - * their resources. This description will contain some static information about the slot, such - * as the location and numeric id of the slot, rpc gateway to communicate with the TaskManager which - * owns the slot. - */ -public class SlotDescriptor { - - /** The ID of the job this slice belongs to. */ - private final JobID jobID; - - /** The location information of the TaskManager to which this slot belongs */ - private final TaskManagerLocation taskManagerLocation; - - /** The number of the slot on which the task is deployed */ - private final int slotNumber; - - /** The resource profile of the slot provides */ - private final ResourceProfile resourceProfile; - - /** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */ - private final TaskManagerGateway taskManagerGateway; - - public SlotDescriptor( - final JobID jobID, - final TaskManagerLocation location, - final int slotNumber, - final ResourceProfile resourceProfile, - final TaskManagerGateway taskManagerGateway) - { - this.jobID = checkNotNull(jobID); - this.taskManagerLocation = checkNotNull(location); - this.slotNumber = slotNumber; - this.resourceProfile = checkNotNull(resourceProfile); - this.taskManagerGateway = checkNotNull(taskManagerGateway); - } - - public SlotDescriptor(final SlotDescriptor other) { - this.jobID = other.jobID; - this.taskManagerLocation = other.taskManagerLocation; - this.slotNumber = other.slotNumber; - this.resourceProfile = other.resourceProfile; - this.taskManagerGateway = other.taskManagerGateway; - } - - // TODO - temporary workaround until we have the SlotDesriptor in the Slot - public SlotDescriptor(final Slot slot) { - this.jobID = slot.getJobID(); - this.taskManagerLocation = slot.getTaskManagerLocation(); - this.slotNumber = slot.getRootSlotNumber(); - this.resourceProfile = new ResourceProfile(0, 0); - this.taskManagerGateway = slot.getTaskManagerGateway(); - } - - /** - * Returns the ID of the job this allocated slot belongs to. - * - * @return the ID of the job this allocated slot belongs to - */ - public JobID getJobID() { - return jobID; - } - - /** - * Gets the number of the slot. - * - * @return The number of the slot on the TaskManager. - */ - public int getSlotNumber() { - return slotNumber; - } - - /** - * Gets the resource profile of the slot. - * - * @return The resource profile of the slot. - */ - public ResourceProfile getResourceProfile() { - return resourceProfile; - } - - /** - * Gets the location info of the TaskManager that offers this slot. - * - * @return The location info of the TaskManager that offers this slot - */ - public TaskManagerLocation getTaskManagerLocation() { - return taskManagerLocation; - } - - /** - * Gets the actor gateway that can be used to send messages to the TaskManager. - * <p> - * This method should be removed once the new interface-based RPC abstraction is in place - * - * @return The actor gateway that can be used to send messages to the TaskManager. - */ - public TaskManagerGateway getTaskManagerGateway() { - return taskManagerGateway; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - SlotDescriptor that = (SlotDescriptor) o; - - if (slotNumber != that.slotNumber) { - return false; - } - if (!jobID.equals(that.jobID)) { - return false; - } - return taskManagerLocation.equals(that.taskManagerLocation); - - } - - @Override - public int hashCode() { - int result = jobID.hashCode(); - result = 31 * result + taskManagerLocation.hashCode(); - result = 31 * result + slotNumber; - return result; - } - - @Override - public String toString() { - return taskManagerLocation + " - " + slotNumber; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 44df29b..5a3a321 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -25,25 +25,41 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered; import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected; import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.StartStoppable; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.util.clock.Clock; +import org.apache.flink.runtime.util.clock.SystemClock; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -58,18 +74,33 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * <p> * All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to * eliminate ambiguities. + * + * TODO : Make pending requests location preference aware + * TODO : Make pass location preferences to ResourceManager when sending a slot request */ -public class SlotPool implements SlotOwner { +public class SlotPool extends RpcEndpoint<SlotPoolGateway> { + + /** The log for the pool - shared also with the internal classes */ + static final Logger LOG = LoggerFactory.getLogger(SlotPool.class); + + // ------------------------------------------------------------------------ - private static final Logger LOG = LoggerFactory.getLogger(SlotPool.class); + private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = Time.minutes(5); + + private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = Time.minutes(10); + + private static final Time DEFAULT_RM_REQUEST_TIMEOUT = Time.seconds(10); + + // ------------------------------------------------------------------------ private final Object lock = new Object(); - /** The executor which is used to execute futures */ - private final Executor executor; + private final JobID jobId; - /** All registered resources, slots will be accepted and used only if the resource is registered */ - private final Set<ResourceID> registeredResources; + private final ProviderAndOwner providerAndOwner; + + /** All registered TaskManagers, slots will be accepted and used only if the resource is registered */ + private final HashSet<ResourceID> registeredTaskManagers; /** The book-keeping of all allocated slots */ private final AllocatedSlots allocatedSlots; @@ -78,10 +109,15 @@ public class SlotPool implements SlotOwner { private final AvailableSlots availableSlots; /** All pending requests waiting for slots */ - private final Map<AllocationID, Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>>> pendingRequests; + private final HashMap<AllocationID, PendingRequest> pendingRequests; + + /** Timeout for request calls to the ResourceManager */ + private final Time resourceManagerRequestsTimeout; - /** Timeout of slot allocation */ - private final Time timeout; + /** Timeout for allocation round trips (RM -> launch TM -> offer slot) */ + private final Time resourceManagerAllocationTimeout; + + private final Clock clock; /** the leader id of job manager */ private UUID jobManagerLeaderId; @@ -92,177 +128,238 @@ public class SlotPool implements SlotOwner { /** The gateway to communicate with resource manager */ private ResourceManagerGateway resourceManagerGateway; - public SlotPool(final Executor executor) { - this.executor = executor; - this.registeredResources = new HashSet<>(); + // ------------------------------------------------------------------------ + + public SlotPool(RpcService rpcService, JobID jobId) { + this(rpcService, jobId, SystemClock.getInstance(), + DEFAULT_SLOT_REQUEST_TIMEOUT, DEFAULT_RM_ALLOCATION_TIMEOUT, DEFAULT_RM_REQUEST_TIMEOUT); + } + + public SlotPool( + RpcService rpcService, + JobID jobId, + Clock clock, + Time slotRequestTimeout, + Time resourceManagerAllocationTimeout, + Time resourceManagerRequestTimeout) { + + super(rpcService); + + this.jobId = checkNotNull(jobId); + this.clock = checkNotNull(clock); + this.resourceManagerRequestsTimeout = checkNotNull(resourceManagerRequestTimeout); + this.resourceManagerAllocationTimeout = checkNotNull(resourceManagerAllocationTimeout); + + this.registeredTaskManagers = new HashSet<>(); this.allocatedSlots = new AllocatedSlots(); this.availableSlots = new AvailableSlots(); this.pendingRequests = new HashMap<>(); - this.timeout = Time.of(5, TimeUnit.SECONDS); - } - public void setJobManagerLeaderId(final UUID jobManagerLeaderId) { - this.jobManagerLeaderId = jobManagerLeaderId; + this.providerAndOwner = new ProviderAndOwner(getSelf(), slotRequestTimeout); } // ------------------------------------------------------------------------ - // Slot Allocation + // Starting and Stopping // ------------------------------------------------------------------------ + @Override + public void start() { + throw new UnsupportedOperationException("Should never call start() without leader ID"); + } + /** - * Try to allocate a simple slot with specified resource profile. + * Start the slot pool to accept RPC calls. * - * @param jobID The job id which the slot allocated for - * @param resourceProfile The needed resource profile - * @return The future of allocated simple slot + * @param jobManagerLeaderId The necessary leader id for running the job. */ - public Future<SimpleSlot> allocateSimpleSlot(final JobID jobID, final ResourceProfile resourceProfile) { - return allocateSimpleSlot(jobID, resourceProfile, new AllocationID()); - } + public void start(UUID jobManagerLeaderId) { + this.jobManagerLeaderId = jobManagerLeaderId; + // TODO - start should not throw an exception + try { + super.start(); + } catch (Exception e) { + throw new RuntimeException("This should never happen", e); + } + } /** - * Try to allocate a simple slot with specified resource profile and specified allocation id. It's mainly - * for testing purpose since we need to specify whatever allocation id we want. + * Suspends this pool, meaning it has lost its authority to accept and distribute slots. */ - @VisibleForTesting - Future<SimpleSlot> allocateSimpleSlot( - final JobID jobID, - final ResourceProfile resourceProfile, - final AllocationID allocationID) - { - final FlinkCompletableFuture<SlotDescriptor> future = new FlinkCompletableFuture<>(); - - internalAllocateSlot(jobID, allocationID, resourceProfile, future); - - return future.thenApplyAsync( - new ApplyFunction<SlotDescriptor, SimpleSlot>() { - @Override - public SimpleSlot apply(SlotDescriptor descriptor) { - SimpleSlot slot = new SimpleSlot( - descriptor.getJobID(), SlotPool.this, - descriptor.getTaskManagerLocation(), descriptor.getSlotNumber(), - descriptor.getTaskManagerGateway()); - synchronized (lock) { - // double validation since we are out of the lock protection after the slot is granted - if (registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID())) { - LOG.info("Allocation[{}] Allocated simple slot: {} for job {}.", allocationID, slot, jobID); - allocatedSlots.add(allocationID, descriptor, slot); - } - else { - throw new RuntimeException("Resource was marked dead asynchronously."); - } - } - return slot; - } - }, - executor - ); + @RpcMethod + public void suspend() { + validateRunsInMainThread(); + + // suspend this RPC endpoint + ((StartStoppable) getSelf()).stop(); + + // do not accept any requests + jobManagerLeaderId = null; + resourceManagerLeaderId = null; + resourceManagerGateway = null; + + // Clear (but not release!) the available slots. The TaskManagers should re-register them + // at the new leader JobManager/SlotPool + availableSlots.clear(); + allocatedSlots.clear(); + pendingRequests.clear(); } + // ------------------------------------------------------------------------ + // Getting PoolOwner and PoolProvider + // ------------------------------------------------------------------------ /** - * Try to allocate a shared slot with specified resource profile. - * - * @param jobID The job id which the slot allocated for - * @param resourceProfile The needed resource profile - * @param sharingGroupAssignment The slot sharing group of the vertex - * @return The future of allocated shared slot + * Gets the slot owner implementation for this pool. + * + * <p>This method does not mutate state and can be called directly (no RPC indirection) + * + * @return The slot owner implementation for this pool. */ - public Future<SharedSlot> allocateSharedSlot( - final JobID jobID, - final ResourceProfile resourceProfile, - final SlotSharingGroupAssignment sharingGroupAssignment) - { - return allocateSharedSlot(jobID, resourceProfile, sharingGroupAssignment, new AllocationID()); + public SlotOwner getSlotOwner() { + return providerAndOwner; } /** - * Try to allocate a shared slot with specified resource profile and specified allocation id. It's mainly - * for testing purpose since we need to specify whatever allocation id we want. + * Gets the slot provider implementation for this pool. + * + * <p>This method does not mutate state and can be called directly (no RPC indirection) + * + * @return The slot provider implementation for this pool. */ - @VisibleForTesting - Future<SharedSlot> allocateSharedSlot( - final JobID jobID, - final ResourceProfile resourceProfile, - final SlotSharingGroupAssignment sharingGroupAssignment, - final AllocationID allocationID) - { - final FlinkCompletableFuture<SlotDescriptor> future = new FlinkCompletableFuture<>(); - - internalAllocateSlot(jobID, allocationID, resourceProfile, future); - - return future.thenApplyAsync( - new ApplyFunction<SlotDescriptor, SharedSlot>() { - @Override - public SharedSlot apply(SlotDescriptor descriptor) { - SharedSlot slot = new SharedSlot( - descriptor.getJobID(), SlotPool.this, descriptor.getTaskManagerLocation(), - descriptor.getSlotNumber(), descriptor.getTaskManagerGateway(), - sharingGroupAssignment); - - synchronized (lock) { - // double validation since we are out of the lock protection after the slot is granted - if (registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID())) { - LOG.info("Allocation[{}] Allocated shared slot: {} for job {}.", allocationID, slot, jobID); - allocatedSlots.add(allocationID, descriptor, slot); - } - else { - throw new RuntimeException("Resource was marked dead asynchronously."); - } - } - return slot; - } - }, - executor - ); + public SlotProvider getSlotProvider() { + return providerAndOwner; } - /** - * Internally allocate the slot with specified resource profile. We will first check whether we have some - * free slot which can meet the requirement already and allocate it immediately. Otherwise, we will try to - * allocation the slot from resource manager. - */ - private void internalAllocateSlot( - final JobID jobID, - final AllocationID allocationID, - final ResourceProfile resourceProfile, - final FlinkCompletableFuture<SlotDescriptor> future) - { - LOG.info("Allocation[{}] Allocating slot with {} for Job {}.", allocationID, resourceProfile, jobID); - - synchronized (lock) { - // check whether we have any free slot which can match the required resource profile - SlotDescriptor freeSlot = availableSlots.poll(resourceProfile); - if (freeSlot != null) { - future.complete(freeSlot); - } - else { - if (resourceManagerGateway != null) { - LOG.info("Allocation[{}] No available slot exists, trying to allocate from resource manager.", - allocationID); - SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile); - pendingRequests.put(allocationID, new Tuple2<>(slotRequest, future)); - resourceManagerGateway.requestSlot(jobManagerLeaderId, resourceManagerLeaderId, slotRequest, timeout) - .handleAsync(new BiFunction<RMSlotRequestReply, Throwable, Void>() { - @Override - public Void apply(RMSlotRequestReply slotRequestReply, Throwable throwable) { - if (throwable != null) { - future.completeExceptionally( - new Exception("Slot allocation from resource manager failed", throwable)); - } else if (slotRequestReply instanceof RMSlotRequestRejected) { - future.completeExceptionally( - new Exception("Slot allocation rejected by resource manager")); - } - return null; - } - }, executor); + // ------------------------------------------------------------------------ + // Resource Manager Connection + // ------------------------------------------------------------------------ + + @RpcMethod + public void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway) { + this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); + this.resourceManagerGateway = checkNotNull(resourceManagerGateway); + } + + @RpcMethod + public void disconnectResourceManager() { + this.resourceManagerLeaderId = null; + this.resourceManagerGateway = null; + } + + // ------------------------------------------------------------------------ + // Slot Allocation + // ------------------------------------------------------------------------ + + @RpcMethod + public Future<SimpleSlot> allocateSlot( + ScheduledUnit task, + ResourceProfile resources, + Iterable<TaskManagerLocation> locationPreferences) { + + return internalAllocateSlot(task, resources, locationPreferences); + } + + @RpcMethod + public void returnAllocatedSlot(Slot slot) { + internalReturnAllocatedSlot(slot); + } + + + Future<SimpleSlot> internalAllocateSlot( + ScheduledUnit task, + ResourceProfile resources, + Iterable<TaskManagerLocation> locationPreferences) { + + // (1) do we have a slot available already? + SlotAndLocality slotFromPool = availableSlots.poll(resources, locationPreferences); + if (slotFromPool != null) { + SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), slotFromPool.locality()); + allocatedSlots.add(slot); + return FlinkCompletableFuture.completed(slot); + } + + // (2) no slot available, and no resource manager connection + if (resourceManagerGateway == null) { + return FlinkCompletableFuture.completedExceptionally( + new NoResourceAvailableException("not connected to ResourceManager and no slot available")); + + } + + // (3) we have a resource manager connection, so let's ask it for more resources + final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); + final AllocationID allocationID = new AllocationID(); + + LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resources, allocationID); + + pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources)); + + Future<RMSlotRequestReply> rmResponse = resourceManagerGateway.requestSlot( + resourceManagerLeaderId, jobManagerLeaderId, + new SlotRequest(jobId, allocationID, resources), + resourceManagerRequestsTimeout); + + // on success, trigger let the slot pool know + rmResponse.thenAcceptAsync(new AcceptFunction<RMSlotRequestReply>() { + @Override + public void accept(RMSlotRequestReply reply) { + if (reply.getAllocationID() != null && reply.getAllocationID().equals(allocationID)) { + if (reply instanceof RMSlotRequestRegistered) { + slotRequestToResourceManagerSuccess(allocationID); + } + else if (reply instanceof RMSlotRequestRejected) { + slotRequestToResourceManagerFailed(allocationID, + new Exception("ResourceManager rejected slot request")); + } + else { + slotRequestToResourceManagerFailed(allocationID, + new Exception("Unknown ResourceManager response: " + reply)); + } } else { - LOG.warn("Allocation[{}] Resource manager not available right now.", allocationID); - future.completeExceptionally(new Exception("Resource manager not available right now.")); + future.completeExceptionally(new Exception(String.format( + "Bug: ResourceManager response had wrong AllocationID. Request: %s , Response: %s", + allocationID, reply.getAllocationID()))); } } + }, getMainThreadExecutor()); + + // on failure, fail the request future + rmResponse.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { + + @Override + public Void apply(Throwable failure) { + slotRequestToResourceManagerFailed(allocationID, failure); + return null; + } + }, getMainThreadExecutor()); + + return future; + } + + private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) { + // a request is pending from the ResourceManager to a (future) TaskManager + // we only add the watcher here in case that request times out + scheduleRunAsync(new Runnable() { + @Override + public void run() { + checkTimeoutSlotAllocation(allocationID); + } + }, resourceManagerAllocationTimeout); + } + + private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throwable failure) { + PendingRequest request = pendingRequests.remove(allocationID); + if (request != null) { + request.future().completeExceptionally(new NoResourceAvailableException( + "No pooled slot available and request to ResourceManager for new slot failed", failure)); + } + } + + private void checkTimeoutSlotAllocation(AllocationID allocationID) { + PendingRequest request = pendingRequests.remove(allocationID); + if (request != null && !request.future().isDone()) { + request.future().completeExceptionally(new TimeoutException("Slot allocation request timed out")); } } @@ -275,123 +372,123 @@ public class SlotPool implements SlotOwner { * slot can be reused by other pending requests if the resource profile matches.n * * @param slot The slot needs to be returned - * @return True if the returning slot been accepted */ - @Override - public boolean returnAllocatedSlot(Slot slot) { + private void internalReturnAllocatedSlot(Slot slot) { checkNotNull(slot); checkArgument(!slot.isAlive(), "slot is still alive"); - checkArgument(slot.getOwner() == this, "slot belongs to the wrong pool."); + checkArgument(slot.getOwner() == providerAndOwner, "slot belongs to the wrong pool."); + // markReleased() is an atomic check-and-set operation, so that the slot is guaranteed + // to be returned only once if (slot.markReleased()) { - synchronized (lock) { - final SlotDescriptor slotDescriptor = allocatedSlots.remove(slot); - if (slotDescriptor != null) { - // check if this TaskManager is valid - if (!registeredResources.contains(slot.getTaskManagerID())) { - return false; - } - - final FlinkCompletableFuture<SlotDescriptor> pendingRequest = pollPendingRequest(slotDescriptor); - if (pendingRequest != null) { - pendingRequest.complete(slotDescriptor); - } - else { - availableSlots.add(slotDescriptor); - } - - return true; + if (allocatedSlots.remove(slot)) { + // this slot allocation is still valid, use the slot to fulfill another request + // or make it available again + final AllocatedSlot taskManagerSlot = slot.getAllocatedSlot(); + final PendingRequest pendingRequest = pollMatchingPendingRequest(taskManagerSlot); + + if (pendingRequest != null) { + LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]", + pendingRequest.allocationID(), taskManagerSlot.getSlotAllocationId()); + + pendingRequest.future().complete(createSimpleSlot(taskManagerSlot, Locality.UNKNOWN)); } else { - throw new IllegalArgumentException("Slot was not allocated from this pool."); + LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId()); + availableSlots.add(taskManagerSlot, clock.relativeTimeMillis()); } } - } - else { - return false; + else { + LOG.debug("Returned slot's allocation has been failed. Dropping slot."); + } } } - private FlinkCompletableFuture<SlotDescriptor> pollPendingRequest(final SlotDescriptor slotDescriptor) { - for (Map.Entry<AllocationID, Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>>> entry : pendingRequests.entrySet()) { - final Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>> pendingRequest = entry.getValue(); - if (slotDescriptor.getResourceProfile().isMatching(pendingRequest.f0.getResourceProfile())) { - pendingRequests.remove(entry.getKey()); - return pendingRequest.f1; + private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) { + final ResourceProfile slotResources = slot.getResourceProfile(); + + for (PendingRequest request : pendingRequests.values()) { + if (slotResources.isMatching(request.resourceProfile())) { + pendingRequests.remove(request.allocationID()); + return request; } } + + // no request pending, or no request matches return null; } - /** - * Release slot to TaskManager, called for finished tasks or canceled jobs. - * - * @param slot The slot needs to be released. - */ - public void releaseSlot(final Slot slot) { - synchronized (lock) { - allocatedSlots.remove(slot); - availableSlots.remove(new SlotDescriptor(slot)); - // TODO: send release request to task manager + @RpcMethod + public Iterable<SlotOffer> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers) { + validateRunsInMainThread(); + + final ArrayList<SlotOffer> result = new ArrayList<>(); + for (Tuple2<AllocatedSlot, SlotOffer> offer : offers) { + if (offerSlot(offer.f0)) { + result.add(offer.f1); + } } - } + return result.isEmpty() ? Collections.<SlotOffer>emptyList() : result; + } + /** * Slot offering by TaskManager with AllocationID. The AllocationID is originally generated by this pool and * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending * request waiting for this slot (maybe fulfilled by some other returned slot). * - * @param allocationID The allocation id of the lo - * @param slotDescriptor The offered slot descriptor + * @param slot The offered slot * @return True if we accept the offering */ - public boolean offerSlot(final AllocationID allocationID, final SlotDescriptor slotDescriptor) { - synchronized (lock) { - // check if this TaskManager is valid - final ResourceID resourceID = slotDescriptor.getTaskManagerLocation().getResourceID(); - if (!registeredResources.contains(resourceID)) { - LOG.warn("Allocation[{}] Slot offering from unregistered TaskManager: {}", - allocationID, slotDescriptor); - return false; - } + @RpcMethod + public boolean offerSlot(final AllocatedSlot slot) { + validateRunsInMainThread(); - // check whether we have already using this slot - final Slot allocatedSlot = allocatedSlots.get(allocationID); - if (allocatedSlot != null) { - final SlotDescriptor allocatedSlotDescriptor = new SlotDescriptor(allocatedSlot); + // check if this TaskManager is valid + final ResourceID resourceID = slot.getTaskManagerId(); + final AllocationID allocationID = slot.getSlotAllocationId(); - if (allocatedSlotDescriptor.equals(slotDescriptor)) { - LOG.debug("Allocation[{}] Duplicated slot offering: {}", - allocationID, slotDescriptor); - return true; - } - else { - LOG.info("Allocation[{}] Allocation had been fulfilled by slot {}, rejecting offered slot {}", - allocationID, allocatedSlotDescriptor, slotDescriptor); - return false; - } - } + if (!registeredTaskManagers.contains(resourceID)) { + LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}", + slot.getSlotAllocationId(), slot); + return false; + } - // check whether we already have this slot in free pool - if (availableSlots.contains(slotDescriptor)) { - LOG.debug("Allocation[{}] Duplicated slot offering: {}", - allocationID, slotDescriptor); - return true; - } + // check whether we have already using this slot + if (allocatedSlots.contains(allocationID) || availableSlots.contains(allocationID)) { + LOG.debug("Received repeated offer for slot [{}]. Ignoring.", allocationID); - // check whether we have request waiting for this slot - if (pendingRequests.containsKey(allocationID)) { - FlinkCompletableFuture<SlotDescriptor> future = pendingRequests.remove(allocationID).f1; - future.complete(slotDescriptor); - return true; - } + // return true here so that the sender will get a positive acknowledgement to the retry + // and mark the offering as a success + return true; + } - // unwanted slot, rejecting this offer - return false; + // check whether we have request waiting for this slot + PendingRequest pendingRequest = pendingRequests.remove(allocationID); + if (pendingRequest != null) { + // we were waiting for this! + SimpleSlot resultSlot = createSimpleSlot(slot, Locality.UNKNOWN); + pendingRequest.future().complete(resultSlot); + allocatedSlots.add(resultSlot); + } + else { + // we were actually not waiting for this: + // - could be that this request had been fulfilled + // - we are receiving the slots from TaskManagers after becoming leaders + availableSlots.add(slot, clock.relativeTimeMillis()); } + + // we accepted the request in any case. slot will be released after it idled for + // too long and timed out + return true; } + + // TODO - periodic (every minute or so) catch slots that were lost (check all slots, if they have any task active) + + // TODO - release slots that were not used to the resource manager + // ------------------------------------------------------------------------ // Error Handling // ------------------------------------------------------------------------ @@ -405,24 +502,29 @@ public class SlotPool implements SlotOwner { * @param allocationID Represents the allocation which should be failed * @param cause The cause of the failure */ + @RpcMethod public void failAllocation(final AllocationID allocationID, final Exception cause) { - synchronized (lock) { - // 1. check whether the allocation still pending - Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>> pendingRequest = - pendingRequests.get(allocationID); - if (pendingRequest != null) { - pendingRequest.f1.completeExceptionally(cause); - return; + final PendingRequest pendingRequest = pendingRequests.remove(allocationID); + if (pendingRequest != null) { + // request was still pending + LOG.debug("Failed pending request [{}] with ", allocationID, cause); + pendingRequest.future().completeExceptionally(cause); + } + else if (availableSlots.tryRemove(allocationID)) { + LOG.debug("Failed available slot [{}] with ", allocationID, cause); + } + else { + Slot slot = allocatedSlots.remove(allocationID); + if (slot != null) { + // release the slot. + // since it is not in 'allocatedSlots' any more, it will be dropped o return' + slot.releaseSlot(); + } + else { + LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause); } - - // 2. check whether we have a free slot corresponding to this allocation id - // TODO: add allocation id to slot descriptor, so we can remove it by allocation id - - // 3. check whether we have a in-use slot corresponding to this allocation id - // TODO: needs mechanism to release the in-use Slot but don't return it back to this pool - - // TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase } + // TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase } // ------------------------------------------------------------------------ @@ -435,10 +537,9 @@ public class SlotPool implements SlotOwner { * * @param resourceID The id of the TaskManager */ - public void registerResource(final ResourceID resourceID) { - synchronized (lock) { - registeredResources.add(resourceID); - } + @RpcMethod + public void registerTaskManager(final ResourceID resourceID) { + registeredTaskManagers.add(resourceID); } /** @@ -447,12 +548,12 @@ public class SlotPool implements SlotOwner { * * @param resourceID The id of the TaskManager */ - public void releaseResource(final ResourceID resourceID) { - synchronized (lock) { - registeredResources.remove(resourceID); - availableSlots.removeByResource(resourceID); + @RpcMethod + public void releaseTaskManager(final ResourceID resourceID) { + if (registeredTaskManagers.remove(resourceID)) { + availableSlots.removeAllForTaskManager(resourceID); - final Set<Slot> allocatedSlotsForResource = allocatedSlots.getSlotsByResource(resourceID); + final Set<Slot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID); for (Slot slot : allocatedSlotsForResource) { slot.releaseSlot(); } @@ -460,24 +561,15 @@ public class SlotPool implements SlotOwner { } // ------------------------------------------------------------------------ - // ResourceManager + // Utilities // ------------------------------------------------------------------------ - public void setResourceManager( - final UUID resourceManagerLeaderId, - final ResourceManagerGateway resourceManagerGateway) - { - synchronized (lock) { - this.resourceManagerLeaderId = resourceManagerLeaderId; - this.resourceManagerGateway = resourceManagerGateway; - } - } - - public void disconnectResourceManager() { - synchronized (lock) { - this.resourceManagerLeaderId = null; - this.resourceManagerGateway = null; + private SimpleSlot createSimpleSlot(AllocatedSlot slot, Locality locality) { + SimpleSlot result = new SimpleSlot(slot, providerAndOwner, slot.getSlotNumber()); + if (locality != null) { + result.setLocality(locality); } + return result; } // ------------------------------------------------------------------------ @@ -487,45 +579,34 @@ public class SlotPool implements SlotOwner { /** * Organize allocated slots from different points of view. */ - static class AllocatedSlots { + private static class AllocatedSlots { /** All allocated slots organized by TaskManager's id */ - private final Map<ResourceID, Set<Slot>> allocatedSlotsByResource; - - /** All allocated slots organized by Slot object */ - private final Map<Slot, AllocationID> allocatedSlots; - - /** All allocated slot descriptors organized by Slot object */ - private final Map<Slot, SlotDescriptor> allocatedSlotsWithDescriptor; + private final Map<ResourceID, Set<Slot>> allocatedSlotsByTaskManager; /** All allocated slots organized by AllocationID */ private final Map<AllocationID, Slot> allocatedSlotsById; AllocatedSlots() { - this.allocatedSlotsByResource = new HashMap<>(); - this.allocatedSlots = new HashMap<>(); - this.allocatedSlotsWithDescriptor = new HashMap<>(); + this.allocatedSlotsByTaskManager = new HashMap<>(); this.allocatedSlotsById = new HashMap<>(); } /** - * Add a new allocation + * Adds a new slot to this collection. * - * @param allocationID The allocation id - * @param slot The allocated slot + * @param slot The allocated slot */ - void add(final AllocationID allocationID, final SlotDescriptor descriptor, final Slot slot) { - allocatedSlots.put(slot, allocationID); - allocatedSlotsById.put(allocationID, slot); - allocatedSlotsWithDescriptor.put(slot, descriptor); + void add(Slot slot) { + allocatedSlotsById.put(slot.getAllocatedSlot().getSlotAllocationId(), slot); final ResourceID resourceID = slot.getTaskManagerID(); - Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID); - if (slotsForResource == null) { - slotsForResource = new HashSet<>(); - allocatedSlotsByResource.put(resourceID, slotsForResource); + Set<Slot> slotsForTaskManager = allocatedSlotsByTaskManager.get(resourceID); + if (slotsForTaskManager == null) { + slotsForTaskManager = new HashSet<>(); + allocatedSlotsByTaskManager.put(resourceID, slotsForTaskManager); } - slotsForResource.add(slot); + slotsForTaskManager.add(slot); } /** @@ -541,11 +622,11 @@ public class SlotPool implements SlotOwner { /** * Check whether we have allocated this slot * - * @param slot The slot needs to checked + * @param slotAllocationId The allocation id of the slot to check * @return True if we contains this slot */ - boolean contains(final Slot slot) { - return allocatedSlots.containsKey(slot); + boolean contains(AllocationID slotAllocationId) { + return allocatedSlotsById.containsKey(slotAllocationId); } /** @@ -553,25 +634,27 @@ public class SlotPool implements SlotOwner { * * @param slot The slot needs to be removed */ - SlotDescriptor remove(final Slot slot) { - final SlotDescriptor descriptor = allocatedSlotsWithDescriptor.remove(slot); - if (descriptor != null) { - final AllocationID allocationID = allocatedSlots.remove(slot); - if (allocationID != null) { - allocatedSlotsById.remove(allocationID); - } else { - throw new IllegalStateException("Bug: maps are inconsistent"); - } + boolean remove(final Slot slot) { + return remove(slot.getAllocatedSlot().getSlotAllocationId()) != null; + } - final ResourceID resourceID = slot.getTaskManagerID(); - final Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID); - slotsForResource.remove(slot); - if (slotsForResource.isEmpty()) { - allocatedSlotsByResource.remove(resourceID); + /** + * Remove an allocation with slot. + * + * @param slotId The ID of the slot to be removed + */ + Slot remove(final AllocationID slotId) { + Slot slot = allocatedSlotsById.remove(slotId); + if (slot != null) { + final ResourceID taskManagerId = slot.getTaskManagerID(); + Set<Slot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId); + slotsForTM.remove(slot); + if (slotsForTM.isEmpty()) { + allocatedSlotsByTaskManager.get(taskManagerId); } - - return descriptor; - } else { + return slot; + } + else { return null; } } @@ -582,119 +665,326 @@ public class SlotPool implements SlotOwner { * @param resourceID The id of the TaskManager * @return Set of slots which are allocated from the same TaskManager */ - Set<Slot> getSlotsByResource(final ResourceID resourceID) { - Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID); - if (slotsForResource != null) { - return new HashSet<>(slotsForResource); + Set<Slot> removeSlotsForTaskManager(final ResourceID resourceID) { + Set<Slot> slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID); + if (slotsForTaskManager != null) { + for (Slot slot : slotsForTaskManager) { + allocatedSlotsById.remove(slot.getAllocatedSlot().getSlotAllocationId()); + } + return slotsForTaskManager; } else { - return new HashSet<>(); + return Collections.emptySet(); } } + void clear() { + allocatedSlotsById.clear(); + allocatedSlotsByTaskManager.clear(); + } + @VisibleForTesting boolean containResource(final ResourceID resourceID) { - return allocatedSlotsByResource.containsKey(resourceID); + return allocatedSlotsByTaskManager.containsKey(resourceID); } @VisibleForTesting int size() { - return allocatedSlots.size(); + return allocatedSlotsById.size(); } } + // ------------------------------------------------------------------------ + /** * Organize all available slots from different points of view. */ - static class AvailableSlots { + private static class AvailableSlots { /** All available slots organized by TaskManager */ - private final Map<ResourceID, Set<SlotDescriptor>> availableSlotsByResource; + private final HashMap<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager; + + /** All available slots organized by host */ + private final HashMap<String, Set<AllocatedSlot>> availableSlotsByHost; - /** All available slots */ - private final Set<SlotDescriptor> availableSlots; + /** The available slots, with the time when they were inserted */ + private final HashMap<AllocationID, SlotAndTimestamp> availableSlots; AvailableSlots() { - this.availableSlotsByResource = new HashMap<>(); - this.availableSlots = new HashSet<>(); + this.availableSlotsByTaskManager = new HashMap<>(); + this.availableSlotsByHost = new HashMap<>(); + this.availableSlots = new HashMap<>(); } /** - * Add an available slot. + * Adds an available slot. * - * @param descriptor The descriptor of the slot + * @param slot The slot to add */ - void add(final SlotDescriptor descriptor) { - availableSlots.add(descriptor); - - final ResourceID resourceID = descriptor.getTaskManagerLocation().getResourceID(); - Set<SlotDescriptor> slotsForResource = availableSlotsByResource.get(resourceID); - if (slotsForResource == null) { - slotsForResource = new HashSet<>(); - availableSlotsByResource.put(resourceID, slotsForResource); + void add(final AllocatedSlot slot, final long timestamp) { + checkNotNull(slot); + + SlotAndTimestamp previous = availableSlots.put( + slot.getSlotAllocationId(), new SlotAndTimestamp(slot, timestamp)); + + if (previous == null) { + final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID(); + final String host = slot.getTaskManagerLocation().getFQDNHostname(); + + Set<AllocatedSlot> slotsForTaskManager = availableSlotsByTaskManager.get(resourceID); + if (slotsForTaskManager == null) { + slotsForTaskManager = new HashSet<>(); + availableSlotsByTaskManager.put(resourceID, slotsForTaskManager); + } + slotsForTaskManager.add(slot); + + Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host); + if (slotsForHost == null) { + slotsForHost = new HashSet<>(); + availableSlotsByHost.put(host, slotsForHost); + } + slotsForHost.add(slot); + } + else { + throw new IllegalStateException("slot already contained"); } - slotsForResource.add(descriptor); } /** - * Check whether we have this slot - * - * @param slotDescriptor The descriptor of the slot - * @return True if we contains this slot + * Check whether we have this slot. */ - boolean contains(final SlotDescriptor slotDescriptor) { - return availableSlots.contains(slotDescriptor); + boolean contains(AllocationID slotId) { + return availableSlots.containsKey(slotId); } /** - * Poll a slot which matches the required resource profile + * Poll a slot which matches the required resource profile. The polling tries to satisfy the + * location preferences, by TaskManager and by host. * - * @param resourceProfile The required resource profile + * @param resourceProfile The required resource profile. + * @param locationPreferences The location preferences, in order to be checked. + * * @return Slot which matches the resource profile, null if we can't find a match */ - SlotDescriptor poll(final ResourceProfile resourceProfile) { - for (SlotDescriptor slotDescriptor : availableSlots) { - if (slotDescriptor.getResourceProfile().isMatching(resourceProfile)) { - remove(slotDescriptor); - return slotDescriptor; + SlotAndLocality poll(ResourceProfile resourceProfile, Iterable<TaskManagerLocation> locationPreferences) { + // fast path if no slots are available + if (availableSlots.isEmpty()) { + return null; + } + + boolean hadLocationPreference = false; + + if (locationPreferences != null) { + + // first search by TaskManager + for (TaskManagerLocation location : locationPreferences) { + hadLocationPreference = true; + + final Set<AllocatedSlot> onTaskManager = availableSlotsByTaskManager.get(location.getResourceID()); + if (onTaskManager != null) { + for (AllocatedSlot candidate : onTaskManager) { + if (candidate.getResourceProfile().isMatching(resourceProfile)) { + remove(candidate.getSlotAllocationId()); + return new SlotAndLocality(candidate, Locality.LOCAL); + } + } + } + } + + // now, search by host + for (TaskManagerLocation location : locationPreferences) { + final Set<AllocatedSlot> onHost = availableSlotsByHost.get(location.getFQDNHostname()); + if (onHost != null) { + for (AllocatedSlot candidate : onHost) { + if (candidate.getResourceProfile().isMatching(resourceProfile)) { + remove(candidate.getSlotAllocationId()); + return new SlotAndLocality(candidate, Locality.HOST_LOCAL); + } + } + } + } + } + + // take any slot + for (SlotAndTimestamp candidate : availableSlots.values()) { + final AllocatedSlot slot = candidate.slot(); + + if (slot.getResourceProfile().isMatching(resourceProfile)) { + remove(slot.getSlotAllocationId()); + return new SlotAndLocality( + slot, hadLocationPreference ? Locality.NON_LOCAL : Locality.UNCONSTRAINED); } } + + // nothing available that matches return null; } /** * Remove all available slots come from specified TaskManager. * - * @param resourceID The id of the TaskManager + * @param taskManager The id of the TaskManager */ - void removeByResource(final ResourceID resourceID) { - final Set<SlotDescriptor> slotsForResource = availableSlotsByResource.remove(resourceID); - if (slotsForResource != null) { - for (SlotDescriptor slotDescriptor : slotsForResource) { - availableSlots.remove(slotDescriptor); + void removeAllForTaskManager(final ResourceID taskManager) { + // remove from the by-TaskManager view + final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.remove(taskManager); + + if (slotsForTm != null && slotsForTm.size() > 0) { + final String host = slotsForTm.iterator().next().getTaskManagerLocation().getFQDNHostname(); + final Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host); + + // remove from the base set and the by-host view + for (AllocatedSlot slot : slotsForTm) { + availableSlots.remove(slot.getSlotAllocationId()); + slotsForHost.remove(slot); + } + + if (slotsForHost.isEmpty()) { + availableSlotsByHost.remove(host); } } } - private void remove(final SlotDescriptor slotDescriptor) { - availableSlots.remove(slotDescriptor); + boolean tryRemove(AllocationID slotId) { + final SlotAndTimestamp sat = availableSlots.remove(slotId); + if (sat != null) { + final AllocatedSlot slot = sat.slot(); + final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID(); + final String host = slot.getTaskManagerLocation().getFQDNHostname(); + + final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.get(resourceID); + final Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host); + + slotsForTm.remove(slot); + slotsForHost.remove(slot); + + if (slotsForTm.isEmpty()) { + availableSlotsByTaskManager.remove(resourceID); + } + if (slotsForHost.isEmpty()) { + availableSlotsByHost.remove(host); + } + + return true; + } + else { + return false; + } + } - final ResourceID resourceID = slotDescriptor.getTaskManagerLocation().getResourceID(); - final Set<SlotDescriptor> slotsForResource = checkNotNull(availableSlotsByResource.get(resourceID)); - slotsForResource.remove(slotDescriptor); - if (slotsForResource.isEmpty()) { - availableSlotsByResource.remove(resourceID); + private void remove(AllocationID slotId) throws IllegalStateException { + if (!tryRemove(slotId)) { + throw new IllegalStateException("slot not contained"); } } @VisibleForTesting - boolean containResource(final ResourceID resourceID) { - return availableSlotsByResource.containsKey(resourceID); + boolean containsTaskManager(ResourceID resourceID) { + return availableSlotsByTaskManager.containsKey(resourceID); } @VisibleForTesting int size() { return availableSlots.size(); } + + @VisibleForTesting + void clear() { + availableSlots.clear(); + availableSlotsByTaskManager.clear(); + availableSlotsByHost.clear(); + } + } + + // ------------------------------------------------------------------------ + + /** + * An implementation of the {@link SlotOwner} and {@link SlotProvider} interfaces + * that delegates methods as RPC calls to the SlotPool's RPC gateway. + */ + private static class ProviderAndOwner implements SlotOwner, SlotProvider { + + private final SlotPoolGateway gateway; + + private final Time timeout; + + ProviderAndOwner(SlotPoolGateway gateway, Time timeout) { + this.gateway = gateway; + this.timeout = timeout; + } + + @Override + public boolean returnAllocatedSlot(Slot slot) { + gateway.returnAllocatedSlot(slot); + return true; + } + + @Override + public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) { + return gateway.allocateSlot( + task, ResourceProfile.UNKNOWN, Collections.<TaskManagerLocation>emptyList(), timeout); + } + } + + // ------------------------------------------------------------------------ + + /** + * A pending request for a slot + */ + private static class PendingRequest { + + private final AllocationID allocationID; + + private final FlinkCompletableFuture<SimpleSlot> future; + + private final ResourceProfile resourceProfile; + + PendingRequest( + AllocationID allocationID, + FlinkCompletableFuture<SimpleSlot> future, + ResourceProfile resourceProfile) { + this.allocationID = allocationID; + this.future = future; + this.resourceProfile = resourceProfile; + } + + public AllocationID allocationID() { + return allocationID; + } + + public FlinkCompletableFuture<SimpleSlot> future() { + return future; + } + + public ResourceProfile resourceProfile() { + return resourceProfile; + } + } + + // ------------------------------------------------------------------------ + + /** + * A slot, together with the timestamp when it was added + */ + private static class SlotAndTimestamp { + + private final AllocatedSlot slot; + + private final long timestamp; + + SlotAndTimestamp( + AllocatedSlot slot, + long timestamp) { + this.slot = slot; + this.timestamp = timestamp; + } + + public AllocatedSlot slot() { + return slot; + } + + public long timestamp() { + return timestamp; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java new file mode 100644 index 0000000..42942ca --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.UUID; + +/** + * The gateway for calls on the {@link SlotPool}. + */ +public interface SlotPoolGateway extends RpcGateway { + + // ------------------------------------------------------------------------ + // shutdown + // ------------------------------------------------------------------------ + + void suspend(); + + // ------------------------------------------------------------------------ + // resource manager connection + // ------------------------------------------------------------------------ + + /** + * Connects the SlotPool to the given ResourceManager. After this method is called, the + * SlotPool will be able to request resources from the given ResourceManager. + * + * @param resourceManagerLeaderId The leader session ID of the resource manager. + * @param resourceManagerGateway The RPC gateway for the resource manager. + */ + void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway); + + /** + * Disconnects the slot pool from its current Resource Manager. After this call, the pool will not + * be able to request further slots from the Resource Manager, and all currently pending requests + * to the resource manager will be canceled. + * + * <p>The slot pool will still be able to serve slots from its internal pool. + */ + void disconnectResourceManager(); + + // ------------------------------------------------------------------------ + // registering / un-registering TaskManagers and slots + // ------------------------------------------------------------------------ + + void registerTaskManager(ResourceID resourceID); + + void releaseTaskManager(ResourceID resourceID); + + Future<Boolean> offerSlot(AllocatedSlot slot); + + Future<Iterable<SlotOffer>> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers); + + void failAllocation(AllocationID allocationID, Exception cause); + + // ------------------------------------------------------------------------ + // allocating and disposing slots + // ------------------------------------------------------------------------ + + Future<SimpleSlot> allocateSlot( + ScheduledUnit task, + ResourceProfile resources, + Iterable<TaskManagerLocation> locationPreferences, + @RpcTimeout Time timeout); + + void returnAllocatedSlot(Slot slot); +} http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java index ec6e9b1..0ef2482 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java @@ -19,19 +19,19 @@ package org.apache.flink.runtime.jobmanager.scheduler; public enum Locality { - - /** - * No constraint existed on the task placement. - */ + + /** No constraint existed on the task placement. */ UNCONSTRAINED, - - /** - * The task was scheduled respecting its locality preferences. - */ + + /** The task was scheduled into the same TaskManager as requested */ LOCAL, - - /** - * The task was scheduled to a destination not included in its locality preferences. - */ - NON_LOCAL + + /** The task was scheduled onto the same host as requested */ + HOST_LOCAL, + + /** The task was scheduled to a destination not included in its locality preferences. */ + NON_LOCAL, + + /** No locality information was provided, it is unknown if the locality was respected */ + UNKNOWN } http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java index e45747b..546f31f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java @@ -54,7 +54,11 @@ public class NoResourceAvailableException extends JobException { public NoResourceAvailableException(String message) { super(message); } - + + public NoResourceAvailableException(String message, Throwable cause) { + super(message, cause); + } + // -------------------------------------------------------------------------------------------- @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java index 9419ab4..f477c49 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java @@ -20,7 +20,9 @@ package org.apache.flink.runtime.jobmanager.slots; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -54,6 +56,9 @@ public class AllocatedSlot { /** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */ private final TaskManagerGateway taskManagerGateway; + /** RPC gateway to call the TaskManager that holds this slot */ + private final TaskExecutorGateway taskExecutorGateway; + /** The number of the slot on the TaskManager to which slot belongs. Purely informational. */ private final int slotNumber; @@ -73,15 +78,23 @@ public class AllocatedSlot { this.slotNumber = slotNumber; this.resourceProfile = checkNotNull(resourceProfile); this.taskManagerGateway = checkNotNull(taskManagerGateway); + this.taskExecutorGateway = null; } - public AllocatedSlot(AllocatedSlot other) { - this.slotAllocationId = other.slotAllocationId; - this.jobID = other.jobID; - this.taskManagerLocation = other.taskManagerLocation; - this.slotNumber = other.slotNumber; - this.resourceProfile = other.resourceProfile; - this.taskManagerGateway = other.taskManagerGateway; + public AllocatedSlot( + AllocationID slotAllocationId, + JobID jobID, + TaskManagerLocation location, + int slotNumber, + ResourceProfile resourceProfile, + TaskExecutorGateway taskExecutorGateway) { + this.slotAllocationId = checkNotNull(slotAllocationId); + this.jobID = checkNotNull(jobID); + this.taskManagerLocation = checkNotNull(location); + this.slotNumber = slotNumber; + this.resourceProfile = checkNotNull(resourceProfile); + this.taskManagerGateway = null; + this.taskExecutorGateway = checkNotNull(taskExecutorGateway); } // ------------------------------------------------------------------------ @@ -96,6 +109,17 @@ public class AllocatedSlot { } /** + * Gets the ID of the TaskManager on which this slot was allocated. + * + * <p>This is equivalent to {@link #getTaskManagerLocation()#getTaskManagerId()}. + * + * @return This slot's TaskManager's ID. + */ + public ResourceID getTaskManagerId() { + return getTaskManagerLocation().getResourceID(); + } + + /** * Returns the ID of the job this allocated slot belongs to. * * @return the ID of the job this allocated slot belongs to @@ -142,8 +166,28 @@ public class AllocatedSlot { return taskManagerGateway; } + public TaskExecutorGateway getTaskExecutorGateway() { + return taskExecutorGateway; + } + // ------------------------------------------------------------------------ + /** + * This always returns a reference hash code. + */ + @Override + public final int hashCode() { + return super.hashCode(); + } + + /** + * This always checks based on reference equality. + */ + @Override + public final boolean equals(Object obj) { + return this == obj; + } + @Override public String toString() { return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + slotNumber; http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java deleted file mode 100644 index 5655fc2..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.slots; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; -import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.instance.SlotPool; -import org.apache.flink.runtime.instance.SlotProvider; -import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; -import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A simple pool based slot provider with {@link SlotPool} as the underlying storage. - */ -public class PooledSlotProvider implements SlotProvider { - - /** The pool which holds all the slots. */ - private final SlotPool slotPool; - - /** The timeout for allocation. */ - private final Time timeout; - - public PooledSlotProvider(final SlotPool slotPool, final Time timeout) { - this.slotPool = slotPool; - this.timeout = timeout; - } - - @Override - public Future<SimpleSlot> allocateSlot(ScheduledUnit task, - boolean allowQueued) throws NoResourceAvailableException - { - checkNotNull(task); - - final JobID jobID = task.getTaskToExecute().getVertex().getJobId(); - final Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, ResourceProfile.UNKNOWN); - try { - final SimpleSlot slot = future.get(timeout.getSize(), timeout.getUnit()); - return FlinkCompletableFuture.completed(slot); - } catch (InterruptedException e) { - throw new NoResourceAvailableException("Could not allocate a slot because it's interrupted."); - } catch (ExecutionException e) { - throw new NoResourceAvailableException("Could not allocate a slot because some error occurred " + - "during allocation, " + e.getMessage()); - } catch (TimeoutException e) { - throw new NoResourceAvailableException("Could not allocate a slot within time limit: " + timeout); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java new file mode 100644 index 0000000..3fe5346 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.slots; + +import org.apache.flink.runtime.jobmanager.scheduler.Locality; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A combination of a {@link AllocatedSlot} and a {@link Locality}. + */ +public class SlotAndLocality { + + private final AllocatedSlot slot; + + private final Locality locality; + + public SlotAndLocality(AllocatedSlot slot, Locality locality) { + this.slot = checkNotNull(slot); + this.locality = checkNotNull(locality); + } + + // ------------------------------------------------------------------------ + + public AllocatedSlot slot() { + return slot; + } + + public Locality locality() { + return locality; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "Slot: " + slot + " (" + locality + ')'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 0b3b68e..a620390 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -53,8 +53,8 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.LeaderIdMismatchException; import org.apache.flink.runtime.instance.Slot; -import org.apache.flink.runtime.instance.SlotDescriptor; import org.apache.flink.runtime.instance.SlotPool; +import org.apache.flink.runtime.instance.SlotPoolGateway; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -62,7 +62,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; -import org.apache.flink.runtime.jobmanager.slots.PooledSlotProvider; +import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -91,19 +91,18 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.InstantiationUtil; + import org.slf4j.Logger; import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -160,7 +159,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { private final SlotPool slotPool; - private final Time allocationTimeout; + private final SlotPoolGateway slotPoolGateway; private volatile UUID leaderSessionID; @@ -249,8 +248,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { // register self as job status change listener executionGraph.registerJobStatusListener(new JobManagerJobStatusListener()); - this.slotPool = new SlotPool(executorService); - this.allocationTimeout = Time.of(5, TimeUnit.SECONDS); + this.slotPool = new SlotPool(rpcService, jobGraph.getJobID()); + this.slotPoolGateway = slotPool.getSelf(); this.registeredTaskManagers = new HashMap<>(4); } @@ -272,10 +271,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { */ public void start(final UUID leaderSessionID) throws Exception { if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) { - - // make sure the slot pool now accepts messages for this leader - slotPool.setJobManagerLeaderId(leaderSessionID); - // make sure we receive RPC and async calls super.start(); @@ -305,8 +300,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @RpcMethod public void startJobExecution() { + // double check that the leader status did not change + if (leaderSessionID == null) { + log.info("Aborting job startup - JobManager lost leader status"); + return; + } + log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID()); + // start the slot pool make sure the slot pool now accepts messages for this leader + log.debug("Staring SlotPool component"); + slotPool.start(leaderSessionID); + try { // job is ready to go, try to establish connection with resource manager // - activate leader retrieval for the resource manager @@ -328,7 +333,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @Override public void run() { try { - executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout)); + executionGraph.scheduleForExecution(slotPool.getSlotProvider()); } catch (Throwable t) { executionGraph.fail(t); @@ -353,27 +358,26 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { return; } - // receive no more messages until started again, should be called before we clear self leader id - ((StartStoppable) getSelf()).stop(); - + // not leader any more - should not accept any leader messages any more leaderSessionID = null; - slotPool.setJobManagerLeaderId(null); - executionGraph.suspend(cause); - // disconnect from resource manager: try { resourceManagerLeaderRetriever.stop(); - } catch (Exception e) { - log.warn("Failed to stop resource manager leader retriever when suspending.", e); + } catch (Throwable t) { + log.warn("Failed to stop resource manager leader retriever when suspending.", t); } - closeResourceManagerConnection(); - // TODO: in the future, the slot pool should not release the resources, so that - // TODO: the TaskManagers offer the resources to the new leader - for (ResourceID taskManagerId : registeredTaskManagers.keySet()) { - slotPool.releaseResource(taskManagerId); - } - registeredTaskManagers.clear(); + // tell the execution graph (JobManager is still processing messages here) + executionGraph.suspend(cause); + + // receive no more messages until started again, should be called before we clear self leader id + ((StartStoppable) getSelf()).stop(); + + // the slot pool stops receiving messages and clears its pooled slots + slotPoolGateway.suspend(); + + // disconnect from resource manager: + closeResourceManagerConnection(); } //---------------------------------------------------------------------------------------------- @@ -452,6 +456,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } } + @RpcMethod public ExecutionState requestPartitionState( final UUID leaderSessionID, final IntermediateDataSetID intermediateResultId, @@ -624,9 +629,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } @RpcMethod - public Iterable<SlotOffer> offerSlots(final ResourceID taskManagerId, - final Iterable<SlotOffer> slots, final UUID leaderId) throws Exception - { + public Future<Iterable<SlotOffer>> offerSlots( + final ResourceID taskManagerId, + final Iterable<SlotOffer> slots, + final UUID leaderId) throws Exception { + validateLeaderSessionId(leaderSessionID); Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId); @@ -634,20 +641,22 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { throw new Exception("Unknown TaskManager " + taskManagerId); } - final Set<SlotOffer> acceptedSlotOffers = new HashSet<>(4); + final JobID jid = jobGraph.getJobID(); + final TaskManagerLocation taskManagerLocation = taskManager.f0; + final TaskExecutorGateway taskManagerGateway = taskManager.f1; + + final ArrayList<Tuple2<AllocatedSlot, SlotOffer>> slotsAndOffers = new ArrayList<>(); + for (SlotOffer slotOffer : slots) { - final SlotDescriptor slotDescriptor = new SlotDescriptor( - jobGraph.getJobID(), - taskManager.f0, - slotOffer.getSlotIndex(), - slotOffer.getResourceProfile(), - null); // TODO: replace the actor gateway with the new rpc gateway, it's ready (taskManager.f1) - if (slotPool.offerSlot(slotOffer.getAllocationId(), slotDescriptor)) { - acceptedSlotOffers.add(slotOffer); - } + final AllocatedSlot slot = new AllocatedSlot( + slotOffer.getAllocationId(), jid, taskManagerLocation, + slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), + taskManagerGateway); + + slotsAndOffers.add(new Tuple2<>(slot, slotOffer)); } - return acceptedSlotOffers; + return slotPoolGateway.offerSlots(slotsAndOffers); } @RpcMethod @@ -662,7 +671,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { throw new Exception("Unknown TaskManager " + taskManagerId); } - slotPool.failAllocation(allocationId, cause); + slotPoolGateway.failAllocation(allocationId, cause); } @RpcMethod @@ -708,7 +717,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { return new RegistrationResponse.Decline("Invalid leader session id"); } - slotPool.registerResource(taskManagerId); + slotPoolGateway.registerTaskManager(taskManagerId); registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway)); return new JMTMRegistrationSuccess(taskManagerId, libraryCacheManager.getBlobServerPort()); } @@ -840,7 +849,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { log.info("JobManager successfully registered at ResourceManager, leader id: {}.", success.getResourceManagerLeaderId()); - slotPool.setResourceManager( + slotPoolGateway.connectToResourceManager( success.getResourceManagerLeaderId(), resourceManagerConnection.getTargetGateway()); } } @@ -852,7 +861,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { resourceManagerConnection.close(); resourceManagerConnection = null; } - slotPool.disconnectResourceManager(); + + slotPoolGateway.disconnectResourceManager(); } private void validateLeaderSessionId(UUID leaderSessionID) throws LeaderIdMismatchException { http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index b971b96..f30e345 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -25,6 +25,7 @@ import org.apache.flink.util.ReflectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -207,6 +208,17 @@ public abstract class RpcEndpoint<C extends RpcGateway> { * @param runnable Runnable to be executed * @param delay The delay after which the runnable will be executed */ + protected void scheduleRunAsync(Runnable runnable, Time delay) { + scheduleRunAsync(runnable, delay.getSize(), delay.getUnit()); + } + + /** + * Execute the runnable in the main thread of the underlying RPC endpoint, with + * a delay of the given number of milliseconds. + * + * @param runnable Runnable to be executed + * @param delay The delay after which the runnable will be executed + */ protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) { ((MainThreadExecutable) self).scheduleRunAsync(runnable, unit.toMillis(delay)); } @@ -255,7 +267,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> { /** * Executor which executes runnables in the main thread context. */ - private class MainThreadExecutor implements Executor { + private static class MainThreadExecutor implements Executor { private final MainThreadExecutable gateway; @@ -264,7 +276,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> { } @Override - public void execute(Runnable runnable) { + public void execute(@Nonnull Runnable runnable) { gateway.runAsync(runnable); } } @@ -277,7 +289,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> { private Class<C> determineSelfGatewayType() { // determine self gateway type - Class c = getClass(); + Class<?> c = getClass(); Class<C> determinedSelfGatewayType; do { determinedSelfGatewayType = ReflectionUtil.getTemplateType1(c);
