Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5091#discussion_r155604499
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
//
------------------------------------------------------------------------
@Override
- public CompletableFuture<SimpleSlot> allocateSlot(
- SlotRequestID requestId,
- ScheduledUnit task,
- ResourceProfile resources,
- Iterable<TaskManagerLocation> locationPreferences,
+ public CompletableFuture<LogicalSlot> allocateSlot(
+ SlotRequestId slotRequestId,
+ ScheduledUnit scheduledUnit,
+ ResourceProfile resourceProfile,
+ Collection<TaskManagerLocation> locationPreferences,
+ boolean allowQueuedScheduling,
Time timeout) {
- return internalAllocateSlot(requestId, task, resources,
locationPreferences);
+ return internalAllocateSlot(
+ slotRequestId,
+ scheduledUnit,
+ resourceProfile,
+ locationPreferences,
+ allowQueuedScheduling);
}
- @Override
- public void returnAllocatedSlot(Slot slot) {
- internalReturnAllocatedSlot(slot);
+ private CompletableFuture<LogicalSlot> internalAllocateSlot(
+ SlotRequestId slotRequestId,
+ ScheduledUnit task,
+ ResourceProfile resourceProfile,
+ Collection<TaskManagerLocation> locationPreferences,
+ boolean allowQueuedScheduling) {
+
+ final SlotSharingGroupId slotSharingGroupId =
task.getSlotSharingGroupId();
+
+ if (slotSharingGroupId != null) {
+ // allocate slot with slot sharing
+ final SlotSharingManager multiTaskSlotManager =
slotSharingManagers.computeIfAbsent(
+ slotSharingGroupId,
+ id -> new SlotSharingManager(
+ id,
+ this,
+ providerAndOwner));
+
+ final SlotSharingManager.MultiTaskSlotLocality
multiTaskSlotFuture;
--- End diff --
The variable name is confusing. `multiTaskSlotFuture` is not of type
`Future`.
---