Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5091#discussion_r155751694
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
//
------------------------------------------------------------------------
@Override
- public CompletableFuture<SimpleSlot> allocateSlot(
- SlotRequestID requestId,
- ScheduledUnit task,
- ResourceProfile resources,
- Iterable<TaskManagerLocation> locationPreferences,
+ public CompletableFuture<LogicalSlot> allocateSlot(
+ SlotRequestId slotRequestId,
+ ScheduledUnit scheduledUnit,
+ ResourceProfile resourceProfile,
+ Collection<TaskManagerLocation> locationPreferences,
+ boolean allowQueuedScheduling,
Time timeout) {
- return internalAllocateSlot(requestId, task, resources,
locationPreferences);
+ return internalAllocateSlot(
+ slotRequestId,
+ scheduledUnit,
+ resourceProfile,
+ locationPreferences,
+ allowQueuedScheduling);
}
- @Override
- public void returnAllocatedSlot(Slot slot) {
- internalReturnAllocatedSlot(slot);
+ private CompletableFuture<LogicalSlot> internalAllocateSlot(
+ SlotRequestId slotRequestId,
+ ScheduledUnit task,
+ ResourceProfile resourceProfile,
+ Collection<TaskManagerLocation> locationPreferences,
+ boolean allowQueuedScheduling) {
+
+ final SlotSharingGroupId slotSharingGroupId =
task.getSlotSharingGroupId();
+
+ if (slotSharingGroupId != null) {
+ // allocate slot with slot sharing
+ final SlotSharingManager multiTaskSlotManager =
slotSharingManagers.computeIfAbsent(
+ slotSharingGroupId,
+ id -> new SlotSharingManager(
+ id,
+ this,
+ providerAndOwner));
+
+ final SlotSharingManager.MultiTaskSlotLocality
multiTaskSlotFuture;
+
+ try {
+ if (task.getCoLocationConstraint() != null) {
+ multiTaskSlotFuture =
allocateCoLocatedMultiTaskSlot(
+ task.getCoLocationConstraint(),
+ multiTaskSlotManager,
+ resourceProfile,
+ locationPreferences,
+ allowQueuedScheduling);
+ } else {
+ multiTaskSlotFuture =
allocateMultiTaskSlot(
+ task.getJobVertexId(),
multiTaskSlotManager,
+ resourceProfile,
+ locationPreferences,
+ allowQueuedScheduling);
+ }
+ } catch (NoResourceAvailableException
noResourceException) {
+ return
FutureUtils.completedExceptionally(noResourceException);
+ }
+
+ // sanity check
+
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+ final SlotSharingManager.SingleTaskSlot leave =
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+ slotRequestId,
+ task.getJobVertexId(),
+ multiTaskSlotFuture.getLocality());
+
+ return leave.getLogicalSlotFuture();
+ } else {
+ // request an allocated slot to assign a single logical
slot to
+ CompletableFuture<SlotAndLocality>
slotAndLocalityFuture = requestAllocatedSlot(
+ slotRequestId,
+ resourceProfile,
+ locationPreferences,
+ allowQueuedScheduling);
+
+ return slotAndLocalityFuture.thenApply(
+ (SlotAndLocality slotAndLocality) -> {
+ final AllocatedSlot allocatedSlot =
slotAndLocality.getSlot();
+
+ final SingleLogicalSlot singleTaskSlot
= new SingleLogicalSlot(
+ slotRequestId,
+ allocatedSlot,
+ null,
+ slotAndLocality.getLocality(),
+ providerAndOwner);
+
+ if
(allocatedSlot.tryAssignPayload(singleTaskSlot)) {
+ return singleTaskSlot;
+ } else {
+ final FlinkException
flinkException = new FlinkException("Could not assign payload to allocated slot
" + allocatedSlot.getAllocationId() + '.');
+ releaseSlot(slotRequestId,
null, flinkException);
+ throw new
CompletionException(flinkException);
+ }
+ });
+ }
}
- @Override
- public CompletableFuture<Acknowledge>
cancelSlotAllocation(SlotRequestID requestId) {
- final PendingRequest pendingRequest =
removePendingRequest(requestId);
+ /**
+ * Allocates a co-located {@link SlotSharingManager.MultiTaskSlot} for
the given {@link CoLocationConstraint}.
+ *
+ * <p>If allowQueuedScheduling is true, then the returned {@link
SlotSharingManager.MultiTaskSlot} can be
+ * uncompleted.
+ *
+ * @param coLocationConstraint for which to allocate a {@link
SlotSharingManager.MultiTaskSlot}
+ * @param multiTaskSlotManager responsible for the slot sharing group
for which to allocate the slot
+ * @param resourceProfile specifying the requirements for the requested
slot
+ * @param locationPreferences containing preferred TaskExecutors on
which to allocate the slot
+ * @param allowQueuedScheduling true if queued scheduling (the returned
task slot must not be completed yet) is allowed, otherwise false
+ * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which
contains the allocated{@link SlotSharingManager.MultiTaskSlot}
+ * and its locality wrt the given location preferences
+ * @throws NoResourceAvailableException if no task slot could be
allocated
+ */
+ private SlotSharingManager.MultiTaskSlotLocality
allocateCoLocatedMultiTaskSlot(
+ CoLocationConstraint coLocationConstraint,
+ SlotSharingManager multiTaskSlotManager,
+ ResourceProfile resourceProfile,
+ Collection<TaskManagerLocation> locationPreferences,
+ boolean allowQueuedScheduling) throws
NoResourceAvailableException {
+ final SlotRequestId coLocationSlotRequestId =
coLocationConstraint.getSlotRequestId();
+
+ if (coLocationSlotRequestId != null) {
+ // we have a slot assigned --> try to retrieve it
+ final SlotSharingManager.TaskSlot taskSlot =
multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);
+
+ if (taskSlot != null) {
+ Preconditions.checkState(taskSlot instanceof
SlotSharingManager.MultiTaskSlot);
+ return
SlotSharingManager.MultiTaskSlotLocality.of(((SlotSharingManager.MultiTaskSlot)
taskSlot), Locality.LOCAL);
+ } else {
+ // the slot may have been cancelled in the mean
time
+ coLocationConstraint.setSlotRequestId(null);
+ }
+ }
- if (pendingRequest != null) {
- failPendingRequest(pendingRequest, new
CancellationException("Allocation with request id" + requestId + "
cancelled."));
+ final Collection<TaskManagerLocation> actualLocationPreferences;
+
+ if (coLocationConstraint.isAssigned()) {
+ actualLocationPreferences =
Collections.singleton(coLocationConstraint.getLocation());
} else {
- final Slot slot = allocatedSlots.get(requestId);
+ actualLocationPreferences = locationPreferences;
+ }
+
+ // get a new multi task slot
+ final SlotSharingManager.MultiTaskSlotLocality
multiTaskSlotLocality = allocateMultiTaskSlot(
+ coLocationConstraint.getGroupId(), multiTaskSlotManager,
+ resourceProfile,
+ actualLocationPreferences,
+ allowQueuedScheduling);
+
+ // check whether we fulfill the co-location constraint
+ if (coLocationConstraint.isAssigned() &&
multiTaskSlotLocality.getLocality() != Locality.LOCAL) {
+ multiTaskSlotLocality.getMultiTaskSlot().release(
+ new FlinkException("Multi task slot is not
local and, thus, does not fulfill the co-location constraint."));
- if (slot != null) {
- LOG.info("Returning allocated slot {} because
the corresponding allocation request {} was cancelled.", slot, requestId);
- if (slot.markCancelled()) {
- internalReturnAllocatedSlot(slot);
+ throw new NoResourceAvailableException("Could not
allocate a local multi task slot for the " +
+ "co location constraint " +
coLocationConstraint + '.');
+ }
+
+ final SlotRequestId slotRequestId = new SlotRequestId();
+ final SlotSharingManager.MultiTaskSlot coLocationSlot =
multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(
+ slotRequestId,
+ coLocationConstraint.getGroupId());
+
+ // mark the requested slot as co-located slot for other
co-located tasks
+ coLocationConstraint.setSlotRequestId(slotRequestId);
+
+ // lock the co-location constraint once we have obtained the
allocated slot
+ coLocationSlot.getSlotContextFuture().whenComplete(
+ (SlotContext slotContext, Throwable throwable) -> {
+ if (throwable == null) {
--- End diff --
Is it acceptable to swallow the `Throwable` here?
---