Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5091#discussion_r155507294
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * <p>The SlotSharingManager allows to create a hierarchy of {@link
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the
task or the
+ * co-location constraint running in this slot.
+ *
+ * <p>The {@link TaskSlot} hierarchy is implemented by {@link
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * <p>Normal slot sharing is represented by a root {@link MultiTaskSlot}
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link
SingleTaskSlot} represents a different
+ * task.
+ *
+ * <p>Co-location constraints are modeled by adding a {@link
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+ private final SlotSharingGroupId slotSharingGroupId;
+
+ // needed to release allocated slots after a complete multi task slot
hierarchy has been released
+ private final AllocatedSlotActions allocatedSlotActions;
+
+ // owner of the slots to which to return them when they are released
from the outside
+ private final SlotOwner slotOwner;
+
+ private final Map<SlotRequestId, TaskSlot> allTaskSlots;
+
+ // Root nodes which have not been completed because the allocated slot
is still pending
+ private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
+
+ // Root nodes which have been completed (the underlying allocated slot
has been assigned)
+ private final Map<TaskManagerLocation, Set<MultiTaskSlot>>
resolvedRootSlots;
+
+ // Internal class to iterate over all resolved root slots
+ private ResolvedRootSlotValues resolvedMultiTaskSlotValues;
+
+ public SlotSharingManager(
+ SlotSharingGroupId slotSharingGroupId,
+ AllocatedSlotActions allocatedSlotActions,
+ SlotOwner slotOwner) {
+ this.slotSharingGroupId =
Preconditions.checkNotNull(slotSharingGroupId);
+ this.allocatedSlotActions =
Preconditions.checkNotNull(allocatedSlotActions);
+ this.slotOwner = Preconditions.checkNotNull(slotOwner);
+
+ allTaskSlots = new HashMap<>(16);
+ unresolvedRootSlots = new HashMap<>(16);
+ resolvedRootSlots = new HashMap<>(16);
+
+ resolvedMultiTaskSlotValues = null;
+ }
+
+ public boolean isEmpty() {
+ return allTaskSlots.isEmpty();
+ }
+
+ public boolean contains(SlotRequestId slotRequestId) {
+ return allTaskSlots.containsKey(slotRequestId);
+ }
+
+ @Nullable
+ public TaskSlot getTaskSlot(SlotRequestId slotRequestId) {
+ return allTaskSlots.get(slotRequestId);
+ }
+
+ /**
+ * Creates a new root slot with the given {@link SlotRequestId}, {@link
SlotContext} future and
+ * the {@link SlotRequestId} of the allocated slot.
+ *
+ * @param slotRequestId of the root slot
+ * @param slotContextFuture with which we create the root slot
+ * @param allocatedSlotRequestId slot request id of the underlying
allocated slot which can be used
+ * to cancel the pending slot request or
release the allocated slot
+ * @return New root slot
+ */
+ public MultiTaskSlot createRootSlot(
+ SlotRequestId slotRequestId,
+ CompletableFuture<SlotContext> slotContextFuture,
+ SlotRequestId allocatedSlotRequestId) {
+ final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot(
+ slotRequestId,
+ slotContextFuture,
+ allocatedSlotRequestId);
+
+ allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
+ unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
+
+ // add the root node to the set of resolved root nodes once the
SlotContext future has been completed
+ // and we know the slot's TaskManagerLocation
+ slotContextFuture.whenComplete(
+ (SlotContext slotContext, Throwable throwable) -> {
+ if (slotContext != null) {
+ final MultiTaskSlot resolvedRootNode =
unresolvedRootSlots.remove(slotRequestId);
+
+ if (resolvedRootNode != null) {
+ final Set<MultiTaskSlot>
innerCollection = resolvedRootSlots.computeIfAbsent(
+
slotContext.getTaskManagerLocation(),
+ taskManagerLocation ->
new HashSet<>(4));
+
+
innerCollection.add(resolvedRootNode);
+ }
+ } else {
+ rootMultiTaskSlot.release(throwable);
+ }
+ });
+
+ return rootMultiTaskSlot;
+ }
+
+ /**
+ * Gets a resolved root slot which does not yet contain the given
groupId. First the given set of
+ * preferred locations is checked.
+ *
+ * @param groupId which the returned slot must not contain
+ * @param locationPreferences specifying which locations are preferred
+ * @return the resolved root slot and its locality wrt to the specified
location preferences
+ * or null if there was no root slot which did not contain
the given groupId
+ */
+ @Nullable
+ public MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId,
Collection<TaskManagerLocation> locationPreferences) {
+ Preconditions.checkNotNull(locationPreferences);
+
+ if (locationPreferences.isEmpty()) {
+ return
getResolvedRootSlotWithoutLocationPreferences(groupId);
+ } else {
+ return
getResolvedRootSlotWithLocationPreferences(groupId, locationPreferences);
+ }
+ }
+
+ /**
+ * Gets a resolved root slot which does not yet contain the given
groupId. The method will try to
+ * find a slot of a TaskManager contained in the collection of
preferred locations. If there is no such slot
+ * with free capacities available, then the method will look for slots
of TaskManager which run on the same
+ * machine as the TaskManager in the collection of preferred locations.
If there is no such slot, then any slot
+ * with free capacities is returned. If there is no such slot, then
null is returned.
+ *
+ * @param groupId which the returned slot must not contain
+ * @param locationPreferences specifying which locations are preferred
+ * @return the resolved root slot and its locality wrt to the specified
location preferences
+ * or null if there was not root slot which did not
contain the given groupId
+ */
+ @Nullable
+ private MultiTaskSlotLocality
getResolvedRootSlotWithLocationPreferences(AbstractID groupId,
Collection<TaskManagerLocation> locationPreferences) {
+ Preconditions.checkNotNull(groupId);
+ Preconditions.checkNotNull(locationPreferences);
+ final Set<String> hostnameSet = new HashSet<>();
+
+ for (TaskManagerLocation locationPreference :
locationPreferences) {
+ final Set<MultiTaskSlot> multiTaskSlots =
resolvedRootSlots.get(locationPreference);
+
+ if (multiTaskSlots != null) {
+ for (MultiTaskSlot multiTaskSlot :
multiTaskSlots) {
+ if (!multiTaskSlot.contains(groupId)) {
+ return
MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
+ }
+ }
+
+
hostnameSet.add(locationPreference.getHostname());
+ }
+ }
+
+ MultiTaskSlot nonLocalMultiTaskSlot = null;
+
+ for (Map.Entry<TaskManagerLocation, Set<MultiTaskSlot>>
taskManagerLocationSetEntry : resolvedRootSlots.entrySet()) {
+ if
(hostnameSet.contains(taskManagerLocationSetEntry.getKey().getHostname())) {
+ for (MultiTaskSlot multiTaskSlot :
taskManagerLocationSetEntry.getValue()) {
+ if (!multiTaskSlot.contains(groupId)) {
+ return
MultiTaskSlotLocality.of(multiTaskSlot, Locality.HOST_LOCAL);
+ }
+ }
+ } else if (nonLocalMultiTaskSlot == null) {
+ for (MultiTaskSlot multiTaskSlot :
taskManagerLocationSetEntry.getValue()) {
+ if (!multiTaskSlot.contains(groupId)) {
+ nonLocalMultiTaskSlot =
multiTaskSlot;
+ }
+ }
+ }
+ }
+
+ if (nonLocalMultiTaskSlot != null) {
+ return MultiTaskSlotLocality.of(nonLocalMultiTaskSlot,
Locality.NON_LOCAL);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Gets a resolved slot which does not yet contain the given groupId
without any location
+ * preferences.
+ *
+ * @param groupId which the returned slot must not contain
+ * @return the resolved slot or null if there was no root slot with
free capacities
+ */
+ @Nullable
+ private MultiTaskSlotLocality
getResolvedRootSlotWithoutLocationPreferences(AbstractID groupId) {
+ Preconditions.checkNotNull(groupId);
+
+ for (Set<MultiTaskSlot> multiTaskSlots :
resolvedRootSlots.values()) {
+ for (MultiTaskSlot multiTaskSlot : multiTaskSlots) {
+ if (!multiTaskSlot.contains(groupId)) {
+ return
MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNCONSTRAINED);
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Gets an unresolved slot which does not yet contain the given
groupId. An unresolved
+ * slot is a slot whose underlying allocated slot has not been
allocated yet.
+ *
+ * @param groupId which the returned slot must not contain
+ * @return the unresolved slot or null if there was no root slot with
free capacities
+ */
+ @Nullable
+ public MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
+ for (MultiTaskSlot multiTaskSlot :
unresolvedRootSlots.values()) {
+ if (!multiTaskSlot.contains(groupId)) {
+ return multiTaskSlot;
+ }
+ }
+
+ return null;
+ }
+
+ //
------------------------------------------------------------------------
+ // Inner classes: TaskSlot hierarchy and helper classes
+ //
------------------------------------------------------------------------
+
+ /**
+ * Helper class which contains a {@link MultiTaskSlot} and its {@link
Locality}.
+ */
+ public static final class MultiTaskSlotLocality {
+ private final MultiTaskSlot multiTaskSlot;
+
+ private final Locality locality;
+
+ public MultiTaskSlotLocality(MultiTaskSlot multiTaskSlot,
Locality locality) {
+ this.multiTaskSlot =
Preconditions.checkNotNull(multiTaskSlot);
+ this.locality = Preconditions.checkNotNull(locality);
+ }
+
+ public MultiTaskSlot getMultiTaskSlot() {
+ return multiTaskSlot;
+ }
+
+ public Locality getLocality() {
+ return locality;
+ }
+
+ public static MultiTaskSlotLocality of(MultiTaskSlot
multiTaskSlot, Locality locality) {
+ return new MultiTaskSlotLocality(multiTaskSlot,
locality);
+ }
+ }
+
+ /**
+ * Base class for all task slots.
+ */
+ public abstract static class TaskSlot {
+ // every TaskSlot has an associated slot request id
+ private final SlotRequestId slotRequestId;
+
+ // all task slots except for the root slots have a group id
assigned
+ @Nullable
+ private final AbstractID groupId;
+
+ protected TaskSlot(SlotRequestId slotRequestId, @Nullable
AbstractID groupId) {
+ this.slotRequestId =
Preconditions.checkNotNull(slotRequestId);
+ this.groupId = groupId;
+ }
+
+ public SlotRequestId getSlotRequestId() {
+ return slotRequestId;
+ }
+
+ @Nullable
+ public AbstractID getGroupId() {
+ return groupId;
+ }
+
+ /**
+ * Check whether the task slot contains the given groupId.
+ *
+ * @param groupId which to check whether it is contained
+ * @return true if the task slot contains the given groupId,
otherwise false
+ */
+ public boolean contains(AbstractID groupId) {
+ return Objects.equals(this.groupId, groupId);
+ }
+
+ /**
+ * Release the task slot.
+ *
+ * @param cause for the release
+ * @return true if the slot could be released, otherwise false
+ */
+ public abstract boolean release(Throwable cause);
+ }
+
+ /**
+ * {@link TaskSlot} implementation which can have multiple other task
slots assigned as children.
+ */
+ public final class MultiTaskSlot extends TaskSlot implements
AllocatedSlot.Payload {
+
+ private final Map<AbstractID, TaskSlot> children;
+
+ // the root node has its parent set to null
+ @Nullable
+ private final MultiTaskSlot parent;
+
+ // underlying allocated slot
+ private final CompletableFuture<SlotContext> slotContextFuture;
+
+ // slot request id of the allocated slot
+ @Nullable
+ private final SlotRequestId allocatedSlotRequestId;
+
+ // true if we are currently releasing our children
+ private boolean releasingChildren;
+
+ private MultiTaskSlot(
+ SlotRequestId slotRequestId,
+ AbstractID groupId,
+ MultiTaskSlot parent) {
+ this(
+ slotRequestId,
+ groupId,
+ Preconditions.checkNotNull(parent),
+ parent.getSlotContextFuture(),
+ null);
+ }
+
+ private MultiTaskSlot(
+ SlotRequestId slotRequestId,
+ CompletableFuture<SlotContext>
slotContextFuture,
+ SlotRequestId allocatedSlotRequestId) {
+ this(
+ slotRequestId,
+ null,
+ null,
+ slotContextFuture,
+ allocatedSlotRequestId);
+ }
+
+ private MultiTaskSlot(
+ SlotRequestId slotRequestId,
+ @Nullable AbstractID groupId,
+ MultiTaskSlot parent,
+ CompletableFuture<SlotContext>
slotContextFuture,
+ SlotRequestId allocatedSlotRequestId) {
+ super(slotRequestId, groupId);
+
+ this.parent = parent;
+ this.slotContextFuture =
Preconditions.checkNotNull(slotContextFuture);
+ this.allocatedSlotRequestId = allocatedSlotRequestId;
+
+ this.children = new HashMap<>(16);
+ this.releasingChildren = false;
+
+ slotContextFuture.whenComplete(
+ (SlotContext ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ release(throwable);
+ }
+ });
+ }
+
+ public CompletableFuture<SlotContext> getSlotContextFuture() {
+ return slotContextFuture;
+ }
+
+ /**
+ * Allocates a {@link MultiTaskSlot} and registers it under the
given groupId at
+ * this {@link MultiTaskSlot}.
+ *
+ * @param slotRequestId of the new multi task slot
+ * @param groupId under which the new multi task slot is
registered
+ * @return the newly allocated {@link MultiTaskSlot}
+ */
+ MultiTaskSlot allocateMultiTaskSlot(SlotRequestId
slotRequestId, AbstractID groupId) {
+ Preconditions.checkState(!super.contains(groupId));
+
+ final MultiTaskSlot inner = new MultiTaskSlot(
+ slotRequestId,
+ groupId,
+ this);
+
+ children.put(groupId, inner);
+
+ // register the newly allocated slot also at the
SlotSharingManager
+ allTaskSlots.put(slotRequestId, inner);
+
+ return inner;
+ }
+
+ /**
+ * Allocates a {@link SingleTaskSlot} and registeres it under
the given groupId at
+ * this {@link MultiTaskSlot}.
+ *
+ * @param slotRequestId of the new single task slot
+ * @param groupId under which the new single task slot is
registered
+ * @param locality of the allocation
+ * @return the newly allocated {@link SingleTaskSlot}
+ */
+ SingleTaskSlot allocateSingleTaskSlot(
+ SlotRequestId slotRequestId,
+ AbstractID groupId,
+ Locality locality) {
+ Preconditions.checkState(!super.contains(groupId));
+
+ final SingleTaskSlot leave = new SingleTaskSlot(
+ slotRequestId,
+ groupId,
+ this,
+ locality);
+
+ children.put(groupId, leave);
+
+ // register the newly allocated slot also at the
SlotSharingManager
+ allTaskSlots.put(slotRequestId, leave);
+
+ return leave;
+ }
+
+ /**
+ * Checks whether this slot or any of its children contains the
given groupId.
+ *
+ * @param groupId which to check whether it is contained
+ * @return true if this or any of its children contains the
given groupId, otherwise false
+ */
+ @Override
+ public boolean contains(AbstractID groupId) {
+ if (super.contains(groupId)) {
+ return true;
+ } else {
+ for (TaskSlot taskSlot : children.values()) {
+ if (taskSlot.contains(groupId)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+ }
+
+ @Override
+ public boolean release(Throwable cause) {
+ releasingChildren = true;
+
+ // first release all children and remove them if they
could be released immediately
+ children.values().removeIf(node -> {
+ boolean release = node.release(cause);
+
+ if (release) {
+ allTaskSlots.remove(node.slotRequestId);
+ }
+
+ return release;
+ });
+
+ releasingChildren = false;
+
+ if (children.isEmpty()) {
+ if (parent != null) {
+ // we remove ourselves from our parent
if we no longer have children
+ parent.releaseChild(getGroupId());
+ } else {
+ // we are the root node --> remove the
root node from the list of task slots
+ allTaskSlots.remove(getSlotRequestId());
+
+ if (!slotContextFuture.isDone() ||
slotContextFuture.isCompletedExceptionally()) {
+ // the root node should still
be unresolved
+
unresolvedRootSlots.remove(getSlotRequestId());
+ } else {
+ // the root node should be
resolved --> we can access the slot context
+ final SlotContext slotContext =
slotContextFuture.getNow(null);
+
+ if (slotContext != null) {
+ final
Set<MultiTaskSlot> multiTaskSlots =
resolvedRootSlots.get(slotContext.getTaskManagerLocation());
+
+ if (multiTaskSlots !=
null) {
+
multiTaskSlots.remove(this);
+
+ if
(multiTaskSlots.isEmpty()) {
+
resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
+ }
+ }
+ }
+ }
+
+ // release the underlying allocated slot
+
allocatedSlotActions.releaseSlot(allocatedSlotRequestId, null, cause);
+ }
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Releases the child with the given childGroupId.
+ *
+ * @param childGroupId identifying the child to release
+ */
+ private void releaseChild(AbstractID childGroupId) {
+ if (!releasingChildren) {
+ TaskSlot child = children.remove(childGroupId);
+
+ if (child != null) {
+
allTaskSlots.remove(child.getSlotRequestId());
+ }
+
+ if (children.isEmpty()) {
+ release(new FlinkException("Release
multi task slot because all children have been released."));
+ }
+ }
+ }
+ }
+
+ /**
+ * {@link TaskSlot} implementation which harbours a {@link
LogicalSlot}. The {@link SingleTaskSlot}
+ * cannot have any children assigned.
+ */
+ public final class SingleTaskSlot extends TaskSlot {
+ private final MultiTaskSlot parent;
+
+ // future containing a LogicalSlot which is completed once the
underlying SlotContext future is completed
+ private final CompletableFuture<LogicalSlot> logicalSlotFuture;
+
+ private SingleTaskSlot(
+ SlotRequestId slotRequestId,
+ AbstractID groupId,
+ MultiTaskSlot parent,
+ Locality locality) {
+ super(slotRequestId, groupId);
+
+ this.parent = Preconditions.checkNotNull(parent);
+
+ Preconditions.checkNotNull(locality);
+ logicalSlotFuture = parent.getSlotContextFuture()
+ .thenApply(
+ (SlotContext slotContext) ->
+ new SingleLogicalSlot(
+ slotRequestId,
+ slotContext,
+ slotSharingGroupId,
+ locality,
+ slotOwner));
+ }
+
+ public CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
+ return logicalSlotFuture;
+ }
+
+ @Override
+ public boolean release(Throwable cause) {
+ logicalSlotFuture.completeExceptionally(cause);
+
+ boolean pendingLogicalSlotRelease = false;
+
+ if (logicalSlotFuture.isDone() &&
!logicalSlotFuture.isCompletedExceptionally()) {
+ // we have a single task slot which we first
have to release
+ final LogicalSlot logicalSlot =
logicalSlotFuture.getNow(null);
+
+ if (logicalSlot != null &&
logicalSlot.isAlive()) {
+ pendingLogicalSlotRelease =
logicalSlot.releaseSlot(cause).isDone();
+ }
+ }
+
+ if (!pendingLogicalSlotRelease) {
+ parent.releaseChild(getGroupId());
+ }
+
+ return !pendingLogicalSlotRelease;
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // Methods and classes for testing
+ //
------------------------------------------------------------------------
+
+ /**
+ * Returns a collection of all resolved root slots.
+ *
+ * @return Collection of all resolved root slots
+ */
+ @VisibleForTesting
+ public Collection<MultiTaskSlot> getResolvedRootSlots() {
+ ResolvedRootSlotValues vs = resolvedMultiTaskSlotValues;
+
+ if (vs == null ){
+ vs = new ResolvedRootSlotValues();
+ resolvedMultiTaskSlotValues = vs;
+ }
+
+ return vs;
+ }
+
+ @VisibleForTesting
+ Collection<MultiTaskSlot> getUnresolvedRootSlots() {
+ return unresolvedRootSlots.values();
+ }
+
+ /**
+ * Collection of all resolved {@link MultiTaskSlot} root slots.
+ */
+ private final class ResolvedRootSlotValues extends
AbstractCollection<MultiTaskSlot> {
+
+ @Override
+ public Iterator<MultiTaskSlot> iterator() {
+ return new
ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
+ }
+
+ @Override
+ public int size() {
+ int numberResolvedMultiTaskSlots = 0;
+
+ for (Set<MultiTaskSlot> multiTaskSlots :
resolvedRootSlots.values()) {
+ numberResolvedMultiTaskSlots +=
multiTaskSlots.size();
+ }
+
+ return numberResolvedMultiTaskSlots;
+ }
+ }
+
+ /**
+ * Iterator over all resolved {@link MultiTaskSlot} root slots.
+ */
+ private static final class ResolvedRootSlotIterator implements
Iterator<MultiTaskSlot> {
+ private final Iterator<Set<MultiTaskSlot>> baseIterator;
+ private Iterator<MultiTaskSlot> currentIterator;
+
+ private ResolvedRootSlotIterator(Iterator<Set<MultiTaskSlot>>
baseIterator) {
+ this.baseIterator =
Preconditions.checkNotNull(baseIterator);
+
+ if (baseIterator.hasNext()) {
+ currentIterator =
baseIterator.next().iterator();
+ } else {
+ currentIterator = Collections.emptyIterator();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ progressToNextElement();
+
+ return currentIterator.hasNext();
+ }
+
+ @Override
+ public MultiTaskSlot next() {
+ progressToNextElement();
+
+ return currentIterator.next();
+ }
+
+ private void progressToNextElement() {
+ while(baseIterator.hasNext() && !
currentIterator.hasNext()) {
--- End diff --
nit: missing space between `while` and `(`. Space after `!`.
---