Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5091#discussion_r156948978
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * <p>The SlotSharingManager allows to create a hierarchy of {@link
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the
task or the
+ * co-location constraint running in this slot.
+ *
+ * <p>The {@link TaskSlot} hierarchy is implemented by {@link
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * <p>Normal slot sharing is represented by a root {@link MultiTaskSlot}
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link
SingleTaskSlot} represents a different
+ * task.
+ *
+ * <p>Co-location constraints are modeled by adding a {@link
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+ private final SlotSharingGroupId slotSharingGroupId;
+
+ // needed to release allocated slots after a complete multi task slot
hierarchy has been released
+ private final AllocatedSlotActions allocatedSlotActions;
+
+ // owner of the slots to which to return them when they are released
from the outside
+ private final SlotOwner slotOwner;
+
+ private final Map<SlotRequestId, TaskSlot> allTaskSlots;
+
+ // Root nodes which have not been completed because the allocated slot
is still pending
+ private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
+
+ // Root nodes which have been completed (the underlying allocated slot
has been assigned)
+ private final Map<TaskManagerLocation, Set<MultiTaskSlot>>
resolvedRootSlots;
+
+ // Internal class to iterate over all resolved root slots
+ private ResolvedRootSlotValues resolvedMultiTaskSlotValues;
+
+ public SlotSharingManager(
+ SlotSharingGroupId slotSharingGroupId,
+ AllocatedSlotActions allocatedSlotActions,
+ SlotOwner slotOwner) {
+ this.slotSharingGroupId =
Preconditions.checkNotNull(slotSharingGroupId);
+ this.allocatedSlotActions =
Preconditions.checkNotNull(allocatedSlotActions);
+ this.slotOwner = Preconditions.checkNotNull(slotOwner);
+
+ allTaskSlots = new HashMap<>(16);
+ unresolvedRootSlots = new HashMap<>(16);
+ resolvedRootSlots = new HashMap<>(16);
+
+ resolvedMultiTaskSlotValues = null;
+ }
+
+ public boolean isEmpty() {
+ return allTaskSlots.isEmpty();
+ }
+
+ public boolean contains(SlotRequestId slotRequestId) {
+ return allTaskSlots.containsKey(slotRequestId);
+ }
+
+ @Nullable
+ public TaskSlot getTaskSlot(SlotRequestId slotRequestId) {
+ return allTaskSlots.get(slotRequestId);
+ }
+
+ /**
+ * Creates a new root slot with the given {@link SlotRequestId}, {@link
SlotContext} future and
+ * the {@link SlotRequestId} of the allocated slot.
+ *
+ * @param slotRequestId of the root slot
+ * @param slotContextFuture with which we create the root slot
+ * @param allocatedSlotRequestId slot request id of the underlying
allocated slot which can be used
+ * to cancel the pending slot request or
release the allocated slot
+ * @return New root slot
+ */
+ public MultiTaskSlot createRootSlot(
+ SlotRequestId slotRequestId,
+ CompletableFuture<SlotContext> slotContextFuture,
+ SlotRequestId allocatedSlotRequestId) {
+ final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot(
+ slotRequestId,
+ slotContextFuture,
+ allocatedSlotRequestId);
+
+ allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
+ unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
+
+ // add the root node to the set of resolved root nodes once the
SlotContext future has been completed
+ // and we know the slot's TaskManagerLocation
+ slotContextFuture.whenComplete(
+ (SlotContext slotContext, Throwable throwable) -> {
+ if (slotContext != null) {
+ final MultiTaskSlot resolvedRootNode =
unresolvedRootSlots.remove(slotRequestId);
+
+ if (resolvedRootNode != null) {
+ final Set<MultiTaskSlot>
innerCollection = resolvedRootSlots.computeIfAbsent(
--- End diff --
True. I'll add the necessary synchronization because it's not guaranteed
that the future completion won't change in the future.
---