[ 
https://issues.apache.org/jira/browse/FLINK-20364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17239054#comment-17239054
 ] 

Guruh Fajar Samudra commented on FLINK-20364:
---------------------------------------------

Github user GJL commented on a diff in the pull request:

[https://github.com/apache/flink/pull/5091#discussion_r155503866]

— Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
 —
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * [http://www.apache.org/licenses/LICENSE-2.0]
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SchedulerTest extends TestLogger {
+
+ @Test
+ public void testAddAndRemoveInstance() {
+ try {
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+ Instance i1 = getRandomInstance(2);
+ Instance i2 = getRandomInstance(2);
+ Instance i3 = getRandomInstance(2);
+
+ assertEquals(0, scheduler.getNumberOfAvailableInstances());
+ assertEquals(0, scheduler.getNumberOfAvailableSlots());
+ scheduler.newInstanceAvailable(i1);
+ assertEquals(1, scheduler.getNumberOfAvailableInstances());
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+ scheduler.newInstanceAvailable(i2);
+ assertEquals(2, scheduler.getNumberOfAvailableInstances());
+ assertEquals(4, scheduler.getNumberOfAvailableSlots());
+ scheduler.newInstanceAvailable(i3);
+ assertEquals(3, scheduler.getNumberOfAvailableInstances());
+ assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+ // cannot add available instance again
+ try
{ + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted instance 
twice"); + }
+ catch (IllegalArgumentException e)
{ + // bueno! + }
+
+ // some instances die
+ assertEquals(3, scheduler.getNumberOfAvailableInstances());
+ assertEquals(6, scheduler.getNumberOfAvailableSlots());
+ scheduler.instanceDied(i2);
+ assertEquals(2, scheduler.getNumberOfAvailableInstances());
+ assertEquals(4, scheduler.getNumberOfAvailableSlots());
+
+ // try to add a dead instance
+ try
{ + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted dead 
instance"); + }
+ catch (IllegalArgumentException e) {
+ // stimmt
— End diff –

😃
Permalink
[~githubbot] added a comment - 07/Dec/17 12:10
Github user GJL commented on a diff in the pull request:

[https://github.com/apache/flink/pull/5091#discussion_r155503994]

— Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
 —
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * [http://www.apache.org/licenses/LICENSE-2.0]
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SchedulerTest extends TestLogger {
+
+ @Test
+ public void testAddAndRemoveInstance() {
+ try {
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+ Instance i1 = getRandomInstance(2);
+ Instance i2 = getRandomInstance(2);
+ Instance i3 = getRandomInstance(2);
+
+ assertEquals(0, scheduler.getNumberOfAvailableInstances());
+ assertEquals(0, scheduler.getNumberOfAvailableSlots());
+ scheduler.newInstanceAvailable(i1);
+ assertEquals(1, scheduler.getNumberOfAvailableInstances());
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+ scheduler.newInstanceAvailable(i2);
+ assertEquals(2, scheduler.getNumberOfAvailableInstances());
+ assertEquals(4, scheduler.getNumberOfAvailableSlots());
+ scheduler.newInstanceAvailable(i3);
+ assertEquals(3, scheduler.getNumberOfAvailableInstances());
+ assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+ // cannot add available instance again
+ try
{ + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted instance 
twice"); + }
+ catch (IllegalArgumentException e)
{ + // bueno! + }
+
+ // some instances die
+ assertEquals(3, scheduler.getNumberOfAvailableInstances());
+ assertEquals(6, scheduler.getNumberOfAvailableSlots());
+ scheduler.instanceDied(i2);
+ assertEquals(2, scheduler.getNumberOfAvailableInstances());
+ assertEquals(4, scheduler.getNumberOfAvailableSlots());
+
+ // try to add a dead instance
+ try
{ + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted dead 
instance"); + }
+ catch (IllegalArgumentException e)
{ + // stimmt + + }
+
+ scheduler.instanceDied(i1);
+ assertEquals(1, scheduler.getNumberOfAvailableInstances());
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+ scheduler.instanceDied(i3);
+ assertEquals(0, scheduler.getNumberOfAvailableInstances());
+ assertEquals(0, scheduler.getNumberOfAvailableSlots());
+
+ assertFalse(i1.isAlive());
+ assertFalse(i2.isAlive());
+ assertFalse(i3.isAlive());
+ }
+ catch (Exception e) {
— End diff –

Better propagate the exception but I guess this file was copy pasted.
Permalink
[~githubbot] added a comment - 07/Dec/17 12:26
Github user GJL commented on a diff in the pull request:

[https://github.com/apache/flink/pull/5091#discussion_r155507294]

— Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 —
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * [http://www.apache.org/licenses/LICENSE-2.0]
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to run 
different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * <p>The SlotSharingManager allows to create a hierarchy of
{@link TaskSlot} such that
+ * every \{@link TaskSlot}
is uniquely identified by a
{@link SlotRequestId} identifying
+ * the request for the TaskSlot and a \{@link AbstractID} identifying the task 
or the
+ * co-location constraint running in this slot.
+ *
+ * <p>The \{@link TaskSlot} hierarchy is implemented by \{@link MultiTaskSlot} 
and
+ * \{@link SingleTaskSlot}. The former class represents inner nodes which can 
contain
+ * a number of other \{@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root \{@link MultiTaskSlot} which is a future
+ * \{@link SlotContext} assigned. The \{@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A \{@link 
MultiTaskSlot}
+ * can be assigned multiple \{@link SingleTaskSlot} or \{@link MultiTaskSlot} 
if and only if
+ * the task slot does not yet contain another child with the same \{@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * <p>Normal slot sharing is represented by a root \{@link MultiTaskSlot} 
which contains a set
+ * of \{@link SingleTaskSlot} on the second layer. Each \{@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * <p>Co-location constraints are modeled by adding a \{@link MultiTaskSlot} 
to the root node. The co-location
+ * constraint is uniquely identified by a \{@link AbstractID} such that we 
cannot add a second co-located
+ * \{@link MultiTaskSlot} to the same root node. Now all co-located tasks will 
be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+ private final SlotSharingGroupId slotSharingGroupId;
+
+ // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+ private final AllocatedSlotActions allocatedSlotActions;
+
+ // owner of the slots to which to return them when they are released from the 
outside
+ private final SlotOwner slotOwner;
+
+ private final Map<SlotRequestId, TaskSlot> allTaskSlots;
+
+ // Root nodes which have not been completed because the allocated slot is 
still pending
+ private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
+
+ // Root nodes which have been completed (the underlying allocated slot has 
been assigned)
+ private final Map<TaskManagerLocation, Set<MultiTaskSlot>> resolvedRootSlots;
+
+ // Internal class to iterate over all resolved root slots
+ private ResolvedRootSlotValues resolvedMultiTaskSlotValues;
+
+ public SlotSharingManager(
+ SlotSharingGroupId slotSharingGroupId,
+ AllocatedSlotActions allocatedSlotActions,
+ SlotOwner slotOwner) \{ + this.slotSharingGroupId = 
Preconditions.checkNotNull(slotSharingGroupId); + this.allocatedSlotActions = 
Preconditions.checkNotNull(allocatedSlotActions); + this.slotOwner = 
Preconditions.checkNotNull(slotOwner); + + allTaskSlots = new HashMap<>(16); + 
unresolvedRootSlots = new HashMap<>(16); + resolvedRootSlots = new 
HashMap<>(16); + + resolvedMultiTaskSlotValues = null; + }
+
+ public boolean isEmpty() \{ + return allTaskSlots.isEmpty(); + }
+
+ public boolean contains(SlotRequestId slotRequestId) \{ + return 
allTaskSlots.containsKey(slotRequestId); + }
+
+ @Nullable
+ public TaskSlot getTaskSlot(SlotRequestId slotRequestId) \{ + return 
allTaskSlots.get(slotRequestId); + }
+
+ /**
+ * Creates a new root slot with the given \{@link SlotRequestId}
,
{@link SlotContext}
future and
+ * the
{@link SlotRequestId}
of the allocated slot.
+ *
+ * @param slotRequestId of the root slot
+ * @param slotContextFuture with which we create the root slot
+ * @param allocatedSlotRequestId slot request id of the underlying allocated 
slot which can be used
+ * to cancel the pending slot request or release the allocated slot
+ * @return New root slot
+ */
+ public MultiTaskSlot createRootSlot(
+ SlotRequestId slotRequestId,
+ CompletableFuture<SlotContext> slotContextFuture,
+ SlotRequestId allocatedSlotRequestId) {
+ final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot(
+ slotRequestId,
+ slotContextFuture,
+ allocatedSlotRequestId);
+
+ allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
+ unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
+
+ // add the root node to the set of resolved root nodes once the SlotContext 
future has been completed
+ // and we know the slot's TaskManagerLocation
+ slotContextFuture.whenComplete(
+ (SlotContext slotContext, Throwable throwable) -> {
+ if (slotContext != null) {
+ final MultiTaskSlot resolvedRootNode = 
unresolvedRootSlots.remove(slotRequestId);
+
+ if (resolvedRootNode != null)
{ + final Set<MultiTaskSlot> innerCollection = 
resolvedRootSlots.computeIfAbsent( + slotContext.getTaskManagerLocation(), + 
taskManagerLocation -> new HashSet<>(4)); + + 
innerCollection.add(resolvedRootNode); + }
+ } else
{ + rootMultiTaskSlot.release(throwable); + }
+ });
+
+ return rootMultiTaskSlot;
+ }
+
+ /**
+ * Gets a resolved root slot which does not yet contain the given groupId. 
First the given set of
+ * preferred locations is checked.
+ *
+ * @param groupId which the returned slot must not contain
+ * @param locationPreferences specifying which locations are preferred
+ * @return the resolved root slot and its locality wrt to the specified 
location preferences
+ * or null if there was no root slot which did not contain the given groupId
+ */
+ @Nullable
+ public MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, 
Collection<TaskManagerLocation> locationPreferences) {
+ Preconditions.checkNotNull(locationPreferences);
+
+ if (locationPreferences.isEmpty())
{ + return getResolvedRootSlotWithoutLocationPreferences(groupId); + }
else
{ + return getResolvedRootSlotWithLocationPreferences(groupId, 
locationPreferences); + }
+ }
+
+ /**
+ * Gets a resolved root slot which does not yet contain the given groupId. The 
method will try to
+ * find a slot of a TaskManager contained in the collection of preferred 
locations. If there is no such slot
+ * with free capacities available, then the method will look for slots of 
TaskManager which run on the same
+ * machine as the TaskManager in the collection of preferred locations. If 
there is no such slot, then any slot
+ * with free capacities is returned. If there is no such slot, then null is 
returned.
+ *
+ * @param groupId which the returned slot must not contain
+ * @param locationPreferences specifying which locations are preferred
+ * @return the resolved root slot and its locality wrt to the specified 
location preferences
+ * or null if there was not root slot which did not contain the given groupId
+ */
+ @Nullable
+ private MultiTaskSlotLocality 
getResolvedRootSlotWithLocationPreferences(AbstractID groupId, 
Collection<TaskManagerLocation> locationPreferences) {
+ Preconditions.checkNotNull(groupId);
+ Preconditions.checkNotNull(locationPreferences);
+ final Set<String> hostnameSet = new HashSet<>();
+
+ for (TaskManagerLocation locationPreference : locationPreferences) {
+ final Set<MultiTaskSlot> multiTaskSlots = 
resolvedRootSlots.get(locationPreference);
+
+ if (multiTaskSlots != null) {
+ for (MultiTaskSlot multiTaskSlot : multiTaskSlots) {
+ if (!multiTaskSlot.contains(groupId))
{ + return MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL); + }
+ }
+
+ hostnameSet.add(locationPreference.getHostname());
+ }
+ }
+
+ MultiTaskSlot nonLocalMultiTaskSlot = null;
+
+ for (Map.Entry<TaskManagerLocation, Set<MultiTaskSlot>> 
taskManagerLocationSetEntry : resolvedRootSlots.entrySet()) {
+ if (hostnameSet.contains(taskManagerLocationSetEntry.getKey().getHostname())) 
{
+ for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) {
+ if (!multiTaskSlot.contains(groupId))
{ + return MultiTaskSlotLocality.of(multiTaskSlot, Locality.HOST_LOCAL); + }
+ }
+ } else if (nonLocalMultiTaskSlot == null) {
+ for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) {
+ if (!multiTaskSlot.contains(groupId))
{ + nonLocalMultiTaskSlot = multiTaskSlot; + }
+ }
+ }
+ }
+
+ if (nonLocalMultiTaskSlot != null)
{ + return MultiTaskSlotLocality.of(nonLocalMultiTaskSlot, Locality.NON_LOCAL); 
+ }
else
{ + return null; + }
+ }
+
+ /**
+ * Gets a resolved slot which does not yet contain the given groupId without 
any location
+ * preferences.
+ *
+ * @param groupId which the returned slot must not contain
+ * @return the resolved slot or null if there was no root slot with free 
capacities
+ */
+ @Nullable
+ private MultiTaskSlotLocality 
getResolvedRootSlotWithoutLocationPreferences(AbstractID groupId) {
+ Preconditions.checkNotNull(groupId);
+
+ for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
+ for (MultiTaskSlot multiTaskSlot : multiTaskSlots) {
+ if (!multiTaskSlot.contains(groupId))
{ + return MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNCONSTRAINED); + }
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Gets an unresolved slot which does not yet contain the given groupId. An 
unresolved
+ * slot is a slot whose underlying allocated slot has not been allocated yet.
+ *
+ * @param groupId which the returned slot must not contain
+ * @return the unresolved slot or null if there was no root slot with free 
capacities
+ */
+ @Nullable
+ public MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
+ for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
+ if (!multiTaskSlot.contains(groupId))
{ + return multiTaskSlot; + }
+ }
+
+ return null;
+ }
+
+ // ------------------------------------------------------------------------
+ // Inner classes: TaskSlot hierarchy and helper classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * Helper class which contains a
{@link MultiTaskSlot} and its \{@link Locality}.
+ */
+ public static final class MultiTaskSlotLocality {
+ private final MultiTaskSlot multiTaskSlot;
+
+ private final Locality locality;
+
+ public MultiTaskSlotLocality(MultiTaskSlot multiTaskSlot, Locality locality) 
\{ + this.multiTaskSlot = Preconditions.checkNotNull(multiTaskSlot); + 
this.locality = Preconditions.checkNotNull(locality); + }
+
+ public MultiTaskSlot getMultiTaskSlot() \{ + return multiTaskSlot; + }
+
+ public Locality getLocality() \{ + return locality; + }
+
+ public static MultiTaskSlotLocality of(MultiTaskSlot multiTaskSlot, Locality 
locality) \{ + return new MultiTaskSlotLocality(multiTaskSlot, locality); + }
+ }
+
+ /**
+ * Base class for all task slots.
+ */
+ public abstract static class TaskSlot {
+ // every TaskSlot has an associated slot request id
+ private final SlotRequestId slotRequestId;
+
+ // all task slots except for the root slots have a group id assigned
+ @Nullable
+ private final AbstractID groupId;
+
+ protected TaskSlot(SlotRequestId slotRequestId, @Nullable AbstractID groupId) 
\{ + this.slotRequestId = Preconditions.checkNotNull(slotRequestId); + 
this.groupId = groupId; + }
+
+ public SlotRequestId getSlotRequestId() \{ + return slotRequestId; + }
+
+ @Nullable
+ public AbstractID getGroupId() \{ + return groupId; + }
+
+ /**
+ * Check whether the task slot contains the given groupId.
+ *
+ * @param groupId which to check whether it is contained
+ * @return true if the task slot contains the given groupId, otherwise false
+ */
+ public boolean contains(AbstractID groupId) \{ + return 
Objects.equals(this.groupId, groupId); + }
+
+ /**
+ * Release the task slot.
+ *
+ * @param cause for the release
+ * @return true if the slot could be released, otherwise false
+ */
+ public abstract boolean release(Throwable cause);
+ }
+
+ /**
+ * \{@link TaskSlot} implementation which can have multiple other task slots 
assigned as children.
+ */
+ public final class MultiTaskSlot extends TaskSlot implements 
AllocatedSlot.Payload {
+
+ private final Map<AbstractID, TaskSlot> children;
+
+ // the root node has its parent set to null
+ @Nullable
+ private final MultiTaskSlot parent;
+
+ // underlying allocated slot
+ private final CompletableFuture<SlotContext> slotContextFuture;
+
+ // slot request id of the allocated slot
+ @Nullable
+ private final SlotRequestId allocatedSlotRequestId;
+
+ // true if we are currently releasing our children
+ private boolean releasingChildren;
+
+ private MultiTaskSlot(
+ SlotRequestId slotRequestId,
+ AbstractID groupId,
+ MultiTaskSlot parent) \{ + this( + slotRequestId, + groupId, + 
Preconditions.checkNotNull(parent), + parent.getSlotContextFuture(), + null); + 
}
+
+ private MultiTaskSlot(
+ SlotRequestId slotRequestId,
+ CompletableFuture<SlotContext> slotContextFuture,
+ SlotRequestId allocatedSlotRequestId) \{ + this( + slotRequestId, + null, + 
null, + slotContextFuture, + allocatedSlotRequestId); + }
+
+ private MultiTaskSlot(
+ SlotRequestId slotRequestId,
+ @Nullable AbstractID groupId,
+ MultiTaskSlot parent,
+ CompletableFuture<SlotContext> slotContextFuture,
+ SlotRequestId allocatedSlotRequestId) {
+ super(slotRequestId, groupId);
+
+ this.parent = parent;
+ this.slotContextFuture = Preconditions.checkNotNull(slotContextFuture);
+ this.allocatedSlotRequestId = allocatedSlotRequestId;
+
+ this.children = new HashMap<>(16);
+ this.releasingChildren = false;
+
+ slotContextFuture.whenComplete(
+ (SlotContext ignored, Throwable throwable) -> {
+ if (throwable != null) \{ + release(throwable); + }
+ });
+ }
+
+ public CompletableFuture<SlotContext> getSlotContextFuture() \{ + return 
slotContextFuture; + }
+
+ /**
+ * Allocates a \{@link MultiTaskSlot}
and registers it under the given groupId at
+ * this
{@link MultiTaskSlot}.
+ *
+ * @param slotRequestId of the new multi task slot
+ * @param groupId under which the new multi task slot is registered
+ * @return the newly allocated \{@link MultiTaskSlot}
+ */
+ MultiTaskSlot allocateMultiTaskSlot(SlotRequestId slotRequestId, AbstractID 
groupId)
{ + Preconditions.checkState(!super.contains(groupId)); + + final MultiTaskSlot 
inner = new MultiTaskSlot( + slotRequestId, + groupId, + this); + + 
children.put(groupId, inner); + + // register the newly allocated slot also at 
the SlotSharingManager + allTaskSlots.put(slotRequestId, inner); + + return 
inner; + }
+
+ /**
+ * Allocates a
{@link SingleTaskSlot} and registeres it under the given groupId at
+ * this \{@link MultiTaskSlot}.
+ *
+ * @param slotRequestId of the new single task slot
+ * @param groupId under which the new single task slot is registered
+ * @param locality of the allocation
+ * @return the newly allocated \{@link SingleTaskSlot}
+ */
+ SingleTaskSlot allocateSingleTaskSlot(
+ SlotRequestId slotRequestId,
+ AbstractID groupId,
+ Locality locality)
{ + Preconditions.checkState(!super.contains(groupId)); + + final 
SingleTaskSlot leave = new SingleTaskSlot( + slotRequestId, + groupId, + this, 
+ locality); + + children.put(groupId, leave); + + // register the newly 
allocated slot also at the SlotSharingManager + allTaskSlots.put(slotRequestId, 
leave); + + return leave; + }
+
+ /**
+ * Checks whether this slot or any of its children contains the given groupId.
+ *
+ * @param groupId which to check whether it is contained
+ * @return true if this or any of its children contains the given groupId, 
otherwise false
+ */
+ @Override
+ public boolean contains(AbstractID groupId) {
+ if (super.contains(groupId))
{ + return true; + }
else {
+ for (TaskSlot taskSlot : children.values()) {
+ if (taskSlot.contains(groupId))
{ + return true; + }
+ }
+
+ return false;
+ }
+ }
+
+ @Override
+ public boolean release(Throwable cause) {
+ releasingChildren = true;
+
+ // first release all children and remove them if they could be released 
immediately
+ children.values().removeIf(node -> {
+ boolean release = node.release(cause);
+
+ if (release)
{ + allTaskSlots.remove(node.slotRequestId); + }
+
+ return release;
+ });
+
+ releasingChildren = false;
+
+ if (children.isEmpty()) {
+ if (parent != null)
{ + // we remove ourselves from our parent if we no longer have children + 
parent.releaseChild(getGroupId()); + }
else {
+ // we are the root node --> remove the root node from the list of task slots
+ allTaskSlots.remove(getSlotRequestId());
+
+ if (!slotContextFuture.isDone() || 
slotContextFuture.isCompletedExceptionally())
{ + // the root node should still be unresolved + 
unresolvedRootSlots.remove(getSlotRequestId()); + }
else {
+ // the root node should be resolved --> we can access the slot context
+ final SlotContext slotContext = slotContextFuture.getNow(null);
+
+ if (slotContext != null) {
+ final Set<MultiTaskSlot> multiTaskSlots = 
resolvedRootSlots.get(slotContext.getTaskManagerLocation());
+
+ if (multiTaskSlots != null) {
+ multiTaskSlots.remove(this);
+
+ if (multiTaskSlots.isEmpty())
{ + resolvedRootSlots.remove(slotContext.getTaskManagerLocation()); + }
+ }
+ }
+ }
+
+ // release the underlying allocated slot
+ allocatedSlotActions.releaseSlot(allocatedSlotRequestId, null, cause);
+ }
+
+ return true;
+ } else
{ + return false; + }
+ }
+
+ /**
+ * Releases the child with the given childGroupId.
+ *
+ * @param childGroupId identifying the child to release
+ */
+ private void releaseChild(AbstractID childGroupId) {
+ if (!releasingChildren) {
+ TaskSlot child = children.remove(childGroupId);
+
+ if (child != null)
{ + allTaskSlots.remove(child.getSlotRequestId()); + }
+
+ if (children.isEmpty())
{ + release(new FlinkException("Release multi task slot because all children 
have been released.")); + }
+ }
+ }
+ }
+
+ /**
+ *
{@link TaskSlot}
implementation which harbours a
{@link LogicalSlot}
. The
{@link SingleTaskSlot}
+ * cannot have any children assigned.
+ */
+ public final class SingleTaskSlot extends TaskSlot {
+ private final MultiTaskSlot parent;
+
+ // future containing a LogicalSlot which is completed once the underlying 
SlotContext future is completed
+ private final CompletableFuture<LogicalSlot> logicalSlotFuture;
+
+ private SingleTaskSlot(
+ SlotRequestId slotRequestId,
+ AbstractID groupId,
+ MultiTaskSlot parent,
+ Locality locality)
{ + super(slotRequestId, groupId); + + this.parent = 
Preconditions.checkNotNull(parent); + + Preconditions.checkNotNull(locality); + 
logicalSlotFuture = parent.getSlotContextFuture() + .thenApply( + (SlotContext 
slotContext) -> + new SingleLogicalSlot( + slotRequestId, + slotContext, + 
slotSharingGroupId, + locality, + slotOwner)); + }
+
+ public CompletableFuture<LogicalSlot> getLogicalSlotFuture()
{ + return logicalSlotFuture; + }
+
+ @Override
+ public boolean release(Throwable cause) {
+ logicalSlotFuture.completeExceptionally(cause);
+
+ boolean pendingLogicalSlotRelease = false;
+
+ if (logicalSlotFuture.isDone() && 
!logicalSlotFuture.isCompletedExceptionally()) {
+ // we have a single task slot which we first have to release
+ final LogicalSlot logicalSlot = logicalSlotFuture.getNow(null);
+
+ if (logicalSlot != null && logicalSlot.isAlive())
{ + pendingLogicalSlotRelease = logicalSlot.releaseSlot(cause).isDone(); + }
+ }
+
+ if (!pendingLogicalSlotRelease)
{ + parent.releaseChild(getGroupId()); + }
+
+ return !pendingLogicalSlotRelease;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Methods and classes for testing
+ // ------------------------------------------------------------------------
+
+ /**
+ * Returns a collection of all resolved root slots.
+ *
+ * @return Collection of all resolved root slots
+ */
+ @VisibleForTesting
+ public Collection<MultiTaskSlot> getResolvedRootSlots() {
+ ResolvedRootSlotValues vs = resolvedMultiTaskSlotValues;
+
+ if (vs == null )
{ + vs = new ResolvedRootSlotValues(); + resolvedMultiTaskSlotValues = vs; + }
+
+ return vs;
+ }
+
+ @VisibleForTesting
+ Collection<MultiTaskSlot> getUnresolvedRootSlots()
{ + return unresolvedRootSlots.values(); + }
+
+ /**
+ * Collection of all resolved
{@link MultiTaskSlot} root slots.
+ */
+ private final class ResolvedRootSlotValues extends 
AbstractCollection<MultiTaskSlot> {
+
+ @Override
+ public Iterator<MultiTaskSlot> iterator() \{ + return new 
ResolvedRootSlotIterator(resolvedRootSlots.values().iterator()); + }
+
+ @Override
+ public int size() {
+ int numberResolvedMultiTaskSlots = 0;
+
+ for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) \{ + 
numberResolvedMultiTaskSlots += multiTaskSlots.size(); + }
+
+ return numberResolvedMultiTaskSlots;
+ }
+ }
+
+ /**
+ * Iterator over all resolved \{@link MultiTaskSlot}
root slots.
+ */
+ private static final class ResolvedRootSlotIterator implements 
Iterator<MultiTaskSlot> {
+ private final Iterator<Set<MultiTaskSlot>> baseIterator;
+ private Iterator<MultiTaskSlot> currentIterator;
+
+ private ResolvedRootSlotIterator(Iterator<Set<MultiTaskSlot>> baseIterator) {
+ this.baseIterator = Preconditions.checkNotNull(baseIterator);
+
+ if (baseIterator.hasNext())
{ + currentIterator = baseIterator.next().iterator(); + }
else
{ + currentIterator = Collections.emptyIterator(); + }
+ }
+
+ @Override
+ public boolean hasNext()
{ + progressToNextElement(); + + return currentIterator.hasNext(); + }
+
+ @Override
+ public MultiTaskSlot next()
{ + progressToNextElement(); + + return currentIterator.next(); + }
+
+ private void progressToNextElement() {
+ while(baseIterator.hasNext() && ! currentIterator.hasNext()) {
— End diff –

nit: missing space between `while` and `(`. Space after `!`.
Permalink
 added a comment - 07/Dec/17 12:28
Github user GJL commented on a diff in the pull request:

[https://github.com/apache/flink/pull/5091#discussion_r155507607]

— Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 —
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * [http://www.apache.org/licenses/LICENSE-2.0]
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
— End diff –

nit: wrong import order (not sorted lexicographically)
```
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
```
items should appear before `LogicalSlot`

> Add support for scheduling with slot sharing
> --------------------------------------------
>
>                 Key: FLINK-20364
>                 URL: https://issues.apache.org/jira/browse/FLINK-20364
>             Project: Flink
>          Issue Type: Test
>          Components: Runtime / Coordination
>    Affects Versions: statefun-2.2.1
>            Reporter: Guruh Fajar Samudra
>            Priority: Major
>             Fix For: statefun-2.2.2
>
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the SlotPool. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to