[
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290433#comment-16290433
]
ASF GitHub Bot commented on FLINK-7956:
---------------------------------------
Github user ifndef-SleePy commented on a diff in the pull request:
https://github.com/apache/flink/pull/5091#discussion_r156621784
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
//
------------------------------------------------------------------------
@Override
- public CompletableFuture<SimpleSlot> allocateSlot(
- SlotRequestID requestId,
- ScheduledUnit task,
- ResourceProfile resources,
- Iterable<TaskManagerLocation> locationPreferences,
+ public CompletableFuture<LogicalSlot> allocateSlot(
+ SlotRequestId slotRequestId,
+ ScheduledUnit scheduledUnit,
+ ResourceProfile resourceProfile,
+ Collection<TaskManagerLocation> locationPreferences,
+ boolean allowQueuedScheduling,
Time timeout) {
- return internalAllocateSlot(requestId, task, resources,
locationPreferences);
+ return internalAllocateSlot(
+ slotRequestId,
+ scheduledUnit,
+ resourceProfile,
+ locationPreferences,
+ allowQueuedScheduling);
}
- @Override
- public void returnAllocatedSlot(Slot slot) {
- internalReturnAllocatedSlot(slot);
+ private CompletableFuture<LogicalSlot> internalAllocateSlot(
+ SlotRequestId slotRequestId,
+ ScheduledUnit task,
+ ResourceProfile resourceProfile,
+ Collection<TaskManagerLocation> locationPreferences,
+ boolean allowQueuedScheduling) {
+
+ final SlotSharingGroupId slotSharingGroupId =
task.getSlotSharingGroupId();
+
+ if (slotSharingGroupId != null) {
+ // allocate slot with slot sharing
+ final SlotSharingManager multiTaskSlotManager =
slotSharingManagers.computeIfAbsent(
+ slotSharingGroupId,
+ id -> new SlotSharingManager(
+ id,
+ this,
+ providerAndOwner));
+
+ final SlotSharingManager.MultiTaskSlotLocality
multiTaskSlotFuture;
+
+ try {
+ if (task.getCoLocationConstraint() != null) {
+ multiTaskSlotFuture =
allocateCoLocatedMultiTaskSlot(
+ task.getCoLocationConstraint(),
+ multiTaskSlotManager,
+ resourceProfile,
+ locationPreferences,
+ allowQueuedScheduling);
+ } else {
+ multiTaskSlotFuture =
allocateMultiTaskSlot(
+ task.getJobVertexId(),
multiTaskSlotManager,
+ resourceProfile,
+ locationPreferences,
+ allowQueuedScheduling);
+ }
+ } catch (NoResourceAvailableException
noResourceException) {
+ return
FutureUtils.completedExceptionally(noResourceException);
+ }
+
+ // sanity check
+
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+ final SlotSharingManager.SingleTaskSlot leave =
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+ slotRequestId,
+ task.getJobVertexId(),
+ multiTaskSlotFuture.getLocality());
+
+ return leave.getLogicalSlotFuture();
+ } else {
+ // request an allocated slot to assign a single logical
slot to
+ CompletableFuture<SlotAndLocality>
slotAndLocalityFuture = requestAllocatedSlot(
+ slotRequestId,
+ resourceProfile,
+ locationPreferences,
+ allowQueuedScheduling);
+
+ return slotAndLocalityFuture.thenApply(
+ (SlotAndLocality slotAndLocality) -> {
+ final AllocatedSlot allocatedSlot =
slotAndLocality.getSlot();
+
+ final SingleLogicalSlot singleTaskSlot
= new SingleLogicalSlot(
+ slotRequestId,
+ allocatedSlot,
+ null,
+ slotAndLocality.getLocality(),
+ providerAndOwner);
+
+ if
(allocatedSlot.tryAssignPayload(singleTaskSlot)) {
+ return singleTaskSlot;
+ } else {
+ final FlinkException
flinkException = new FlinkException("Could not assign payload to allocated slot
" + allocatedSlot.getAllocationId() + '.');
+ releaseSlot(slotRequestId,
null, flinkException);
+ throw new
CompletionException(flinkException);
+ }
+ });
+ }
}
- @Override
- public CompletableFuture<Acknowledge>
cancelSlotAllocation(SlotRequestID requestId) {
- final PendingRequest pendingRequest =
removePendingRequest(requestId);
+ /**
+ * Allocates a co-located {@link SlotSharingManager.MultiTaskSlot} for
the given {@link CoLocationConstraint}.
+ *
+ * <p>If allowQueuedScheduling is true, then the returned {@link
SlotSharingManager.MultiTaskSlot} can be
+ * uncompleted.
+ *
+ * @param coLocationConstraint for which to allocate a {@link
SlotSharingManager.MultiTaskSlot}
+ * @param multiTaskSlotManager responsible for the slot sharing group
for which to allocate the slot
+ * @param resourceProfile specifying the requirements for the requested
slot
+ * @param locationPreferences containing preferred TaskExecutors on
which to allocate the slot
+ * @param allowQueuedScheduling true if queued scheduling (the returned
task slot must not be completed yet) is allowed, otherwise false
+ * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which
contains the allocated{@link SlotSharingManager.MultiTaskSlot}
+ * and its locality wrt the given location preferences
+ * @throws NoResourceAvailableException if no task slot could be
allocated
+ */
+ private SlotSharingManager.MultiTaskSlotLocality
allocateCoLocatedMultiTaskSlot(
+ CoLocationConstraint coLocationConstraint,
+ SlotSharingManager multiTaskSlotManager,
+ ResourceProfile resourceProfile,
+ Collection<TaskManagerLocation> locationPreferences,
+ boolean allowQueuedScheduling) throws
NoResourceAvailableException {
+ final SlotRequestId coLocationSlotRequestId =
coLocationConstraint.getSlotRequestId();
+
+ if (coLocationSlotRequestId != null) {
+ // we have a slot assigned --> try to retrieve it
+ final SlotSharingManager.TaskSlot taskSlot =
multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);
+
+ if (taskSlot != null) {
+ Preconditions.checkState(taskSlot instanceof
SlotSharingManager.MultiTaskSlot);
+ return
SlotSharingManager.MultiTaskSlotLocality.of(((SlotSharingManager.MultiTaskSlot)
taskSlot), Locality.LOCAL);
+ } else {
+ // the slot may have been cancelled in the mean
time
+ coLocationConstraint.setSlotRequestId(null);
+ }
+ }
- if (pendingRequest != null) {
- failPendingRequest(pendingRequest, new
CancellationException("Allocation with request id" + requestId + "
cancelled."));
+ final Collection<TaskManagerLocation> actualLocationPreferences;
+
+ if (coLocationConstraint.isAssigned()) {
+ actualLocationPreferences =
Collections.singleton(coLocationConstraint.getLocation());
} else {
- final Slot slot = allocatedSlots.get(requestId);
+ actualLocationPreferences = locationPreferences;
+ }
+
+ // get a new multi task slot
+ final SlotSharingManager.MultiTaskSlotLocality
multiTaskSlotLocality = allocateMultiTaskSlot(
+ coLocationConstraint.getGroupId(), multiTaskSlotManager,
+ resourceProfile,
+ actualLocationPreferences,
+ allowQueuedScheduling);
+
+ // check whether we fulfill the co-location constraint
+ if (coLocationConstraint.isAssigned() &&
multiTaskSlotLocality.getLocality() != Locality.LOCAL) {
+ multiTaskSlotLocality.getMultiTaskSlot().release(
+ new FlinkException("Multi task slot is not
local and, thus, does not fulfill the co-location constraint."));
- if (slot != null) {
- LOG.info("Returning allocated slot {} because
the corresponding allocation request {} was cancelled.", slot, requestId);
- if (slot.markCancelled()) {
- internalReturnAllocatedSlot(slot);
+ throw new NoResourceAvailableException("Could not
allocate a local multi task slot for the " +
+ "co location constraint " +
coLocationConstraint + '.');
+ }
+
+ final SlotRequestId slotRequestId = new SlotRequestId();
+ final SlotSharingManager.MultiTaskSlot coLocationSlot =
multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(
+ slotRequestId,
+ coLocationConstraint.getGroupId());
+
+ // mark the requested slot as co-located slot for other
co-located tasks
+ coLocationConstraint.setSlotRequestId(slotRequestId);
+
+ // lock the co-location constraint once we have obtained the
allocated slot
+ coLocationSlot.getSlotContextFuture().whenComplete(
+ (SlotContext slotContext, Throwable throwable) -> {
+ if (throwable == null) {
+ // check whether we are still assigned
to the co-location constraint
+ if
(Objects.equals(coLocationConstraint.getSlotRequestId(), slotRequestId)) {
+
coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation());
+ }
}
+ });
+
+ return
SlotSharingManager.MultiTaskSlotLocality.of(coLocationSlot,
multiTaskSlotLocality.getLocality());
+ }
+
+ /**
+ * Allocates a {@link SlotSharingManager.MultiTaskSlot} for the given
groupId which is in the
+ * slot sharing group for which the given {@link SlotSharingManager} is
responsible.
+ *
+ * <p>If allowQueuedScheduling is true, then the method can return an
uncompleted {@link SlotSharingManager.MultiTaskSlot}.
+ *
+ * @param groupId for which to allocate a new {@link
SlotSharingManager.MultiTaskSlot}
+ * @param slotSharingManager responsible for the slot sharing group for
which to allocate the slot
+ * @param resourceProfile specifying the requirements for the requested
slot
+ * @param locationPreferences containing preferred TaskExecutors on
which to allocate the slot
+ * @param allowQueuedScheduling true if queued scheduling (the returned
task slot must not be completed yet) is allowed, otherwise false
+ * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which
contains the allocated {@link SlotSharingManager.MultiTaskSlot}
+ * and its locality wrt the given location preferences
+ * @throws NoResourceAvailableException if no task slot could be
allocated
+ */
+ private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(
+ AbstractID groupId,
+ SlotSharingManager slotSharingManager,
+ ResourceProfile resourceProfile,
+ Collection<TaskManagerLocation> locationPreferences,
+ boolean allowQueuedScheduling) throws
NoResourceAvailableException {
+
+ // check first whether we have a resolved root slot which we
can use
+ SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality
= slotSharingManager.getResolvedRootSlot(
+ groupId,
+ locationPreferences);
+
+ if (multiTaskSlotLocality != null &&
multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
+ return multiTaskSlotLocality;
+ }
+
+ final SlotRequestId allocatedSlotRequestId = new
SlotRequestId();
+ final SlotRequestId multiTaskSlotRequestId = new
SlotRequestId();
+
+ // check whether we have an allocated slot available which we
can use to create a new multi task slot in
+ final SlotAndLocality slotAndLocality =
pollAndAllocateSlot(allocatedSlotRequestId, resourceProfile,
locationPreferences);
+
+ if (slotAndLocality != null && (slotAndLocality.getLocality()
== Locality.LOCAL || multiTaskSlotLocality == null)) {
+
+ final AllocatedSlot allocatedSlot =
slotAndLocality.getSlot();
+ final SlotSharingManager.MultiTaskSlot multiTaskSlot =
slotSharingManager.createRootSlot(
+ multiTaskSlotRequestId,
+
CompletableFuture.completedFuture(slotAndLocality.getSlot()),
+ allocatedSlotRequestId);
+
+ if (allocatedSlot.tryAssignPayload(multiTaskSlot)) {
+ return
SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot,
slotAndLocality.getLocality());
} else {
- LOG.debug("There was no slot allocation with {}
to be cancelled.", requestId);
+ multiTaskSlot.release(new FlinkException("Could
not assign payload to allocated slot " +
+ allocatedSlot.getAllocationId() + '.'));
}
}
- return CompletableFuture.completedFuture(Acknowledge.get());
- }
+ if (multiTaskSlotLocality != null) {
+ // prefer slot sharing group slots over unused slots
+ if (slotAndLocality != null) {
+ releaseSlot(
+ allocatedSlotRequestId,
+ null,
+ new FlinkException("Locality constraint
is not better fulfilled by allocated slot."));
+ }
+ return multiTaskSlotLocality;
--- End diff --
If multiTaskSlotLocality != null and multiTaskSlotLocality.getLocality() !=
LOCAL and slotAndLocality == null
It will return multiTaskSlotLocality without LOCAL locality. Is this
expectable?
> Add support for scheduling with slot sharing
> --------------------------------------------
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
> Issue Type: Sub-task
> Components: Scheduler
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add
> support for scheduling with slot sharing to the {{SlotPool}}. This will also
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the
> Flip-6 {{MiniCluster}}.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)