tillrohrmann commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r480001862
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java
##########
@@ -169,44 +146,12 @@ public void cancelSlotRequest(SlotRequestId
slotRequestId, Throwable cause) {
}
}
- private void schedulePendingRequestBulkTimeoutCheck(
- final PhysicalSlotRequestBulk slotRequestBulk,
- final Time timeout) {
-
- componentMainThreadExecutor.schedule(() -> {
- final PhysicalSlotRequestBulkChecker.TimeoutCheckResult
result =
-
slotRequestBulkChecker.checkPhysicalSlotRequestBulkTimeout(slotRequestBulk,
timeout);
-
- switch (result) {
- case PENDING:
- //re-schedule the timeout check
-
schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, timeout);
- break;
- case TIMEOUT:
- timeoutSlotRequestBulk(slotRequestBulk);
- break;
- default: // no action to take
- }
- }, timeout.getSize(), timeout.getUnit());
- }
-
- private void timeoutSlotRequestBulk(final PhysicalSlotRequestBulk
slotRequestBulk) {
- final Exception cause = new TimeoutException("Slot request bulk
is not fulfillable!");
- // pending requests must be canceled first otherwise they might
be fulfilled by
- // allocated slots released from this bulk
- for (SlotRequestId slotRequestId :
slotRequestBulk.getPendingRequests().keySet()) {
- cancelSlotRequest(slotRequestId, cause);
- }
- for (SlotRequestId slotRequestId :
slotRequestBulk.getFulfilledRequests().keySet()) {
- cancelSlotRequest(slotRequestId, cause);
- }
- }
-
- private Set<SlotInfo> getAllSlotInfos() {
- return Stream
- .concat(
-
slotPool.getAvailableSlotsInformation().stream(),
-
slotPool.getAllocatedSlotsInformation().stream())
- .collect(Collectors.toSet());
+ private PhysicalSlotRequestBulkImpl createPhysicalSlotRequestBulk(final
Collection<PhysicalSlotRequest> physicalSlotRequests) {
+ final PhysicalSlotRequestBulkImpl slotRequestBulk = new
PhysicalSlotRequestBulkImpl(physicalSlotRequests
+ .stream()
+ .collect(Collectors.toMap(
+ PhysicalSlotRequest::getSlotRequestId,
+ r ->
r.getSlotProfile().getPhysicalSlotResourceProfile())), this::cancelSlotRequest);
Review comment:
nit the formatting is a bit off here
```suggestion
final PhysicalSlotRequestBulkImpl slotRequestBulk = new
PhysicalSlotRequestBulkImpl(
physicalSlotRequests
.stream()
.collect(Collectors.toMap(
PhysicalSlotRequest::getSlotRequestId,
r ->
r.getSlotProfile().getPhysicalSlotResourceProfile())),
this::cancelSlotRequest);
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java
##########
@@ -49,50 +43,33 @@
private static final Logger LOG =
LoggerFactory.getLogger(BulkSlotProviderImpl.class);
- private ComponentMainThreadExecutor componentMainThreadExecutor;
-
private final SlotSelectionStrategy slotSelectionStrategy;
private final SlotPool slotPool;
private final PhysicalSlotRequestBulkChecker slotRequestBulkChecker;
- BulkSlotProviderImpl(final SlotSelectionStrategy slotSelectionStrategy,
final SlotPool slotPool) {
+ BulkSlotProviderImpl(
+ final SlotSelectionStrategy slotSelectionStrategy,
+ final SlotPool slotPool,
+ final PhysicalSlotRequestBulkChecker
slotRequestBulkChecker) {
this.slotSelectionStrategy =
checkNotNull(slotSelectionStrategy);
this.slotPool = checkNotNull(slotPool);
-
- this.slotRequestBulkChecker = new
PhysicalSlotRequestBulkChecker(
- this::getAllSlotInfos,
- SystemClock.getInstance());
-
- this.componentMainThreadExecutor = new
ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
- "Scheduler is not initialized with proper main thread
executor. " +
- "Call to BulkSlotProvider.start(...)
required.");
- }
-
- @Override
- public void start(final ComponentMainThreadExecutor mainThreadExecutor)
{
- this.componentMainThreadExecutor = mainThreadExecutor;
+ this.slotRequestBulkChecker = slotRequestBulkChecker;
Review comment:
the null checks are not consistent in this class.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
##########
@@ -18,131 +18,24 @@
package org.apache.flink.runtime.jobmaster.slotpool;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.util.clock.Clock;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * This class helps to check the status of physical slot request bulks.
+ * This class tracks a fulfil-ability timeout of a bulk of physical slot
requests.
+ *
+ * <p>The check stops when all physical slot requests of the bulk are
fulfilled by available or newly allocated slots.
+ * The bulk is fulfillable if all its physical slot requests can be fulfilled
either by available or
+ * newly allocated slots or slots which currently used by other job subtasks.
+ * The bulk gets canceled if the timeout occurs and the bulk is not
fulfillable.
+ * The timeout does not tick while the bulk is fulfillable but not fulfilled
yet.
Review comment:
tick -> trigger?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -161,158 +173,66 @@ private SharedSlot getOrAllocateSharedSlot(
.thenCompose(slotProfile ->
slotProvider.allocatePhysicalSlot(
new
PhysicalSlotRequest(physicalSlotRequestId, slotProfile,
slotWillBeOccupiedIndefinitely)))
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
- return new SharedSlot(physicalSlotRequestId,
physicalSlotResourceProfile, group, physicalSlotFuture);
+ return new SharedSlot(
+ physicalSlotRequestId,
+ physicalSlotResourceProfile,
+ group,
+ physicalSlotFuture,
+ slotWillBeOccupiedIndefinitely,
+ this::releaseSharedSlot);
});
}
+ private void releaseSharedSlot(ExecutionSlotSharingGroup
executionSlotSharingGroup) {
+ SharedSlot slot = sharedSlots.remove(executionSlotSharingGroup);
+ if (slot != null) {
+ slotProvider.cancelSlotRequest(
+ slot.getPhysicalSlotRequestId(),
+ new FlinkException("Slot is being returned from
SlotSharingExecutionSlotAllocator."));
+ } else {
+ LOG.debug("There is no slot for
ExecutionSlotSharingGroup {} to release", executionSlotSharingGroup);
+ }
+ }
+
private ResourceProfile
getPhysicalSlotResourceProfile(ExecutionSlotSharingGroup
executionSlotSharingGroup) {
return executionSlotSharingGroup
.getExecutionVertexIds()
.stream()
.reduce(ResourceProfile.ZERO, (r, e) ->
r.merge(resourceProfileRetriever.apply(e)), ResourceProfile::merge);
}
- private class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
- private final SlotRequestId physicalSlotRequestId;
-
- private final ResourceProfile physicalSlotResourceProfile;
-
- private final ExecutionSlotSharingGroup
executionSlotSharingGroup;
-
- private final CompletableFuture<PhysicalSlot> slotContextFuture;
-
- private final DualKeyLinkedMap<ExecutionVertexID,
SlotRequestId, CompletableFuture<SingleLogicalSlot>> requestedLogicalSlots;
-
- private SharedSlot(
- SlotRequestId physicalSlotRequestId,
- ResourceProfile physicalSlotResourceProfile,
- ExecutionSlotSharingGroup
executionSlotSharingGroup,
- CompletableFuture<PhysicalSlot>
slotContextFuture) {
- this.physicalSlotRequestId = physicalSlotRequestId;
- this.physicalSlotResourceProfile =
physicalSlotResourceProfile;
- this.executionSlotSharingGroup =
executionSlotSharingGroup;
- this.slotContextFuture =
slotContextFuture.thenApply(physicalSlot -> {
- Preconditions.checkState(
- physicalSlot.tryAssignPayload(this),
- "Unexpected physical slot payload
assignment failure!");
- return physicalSlot;
+ private SharingPhysicalSlotRequestBulk
createBulk(Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) {
+ Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests
= executions
+ .keySet()
+ .stream()
+ .collect(Collectors.toMap(
+ group -> group,
+ group ->
sharedSlots.get(group).getPhysicalSlotResourceProfile()
+ ));
+ Map<ExecutionSlotSharingGroup, AllocationID> fulfilledRequests
= new HashMap<>();
+ SharingPhysicalSlotRequestBulk bulk = new
SharingPhysicalSlotRequestBulk(
+ executions,
+ pendingRequests,
+ fulfilledRequests,
+ (executionVertexId, cause) -> {
+ ExecutionSlotSharingGroup group =
slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexId);
+
sharedSlots.get(group).cancelLogicalSlotRequest(executionVertexId, cause);
Review comment:
Here I would suggest to add a check state that `sharedSlots.get(group)`
actually contains an item.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class PhysicalSlotRequestBulkCheckerImpl implements
PhysicalSlotRequestBulkChecker {
+
+ private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+ private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+ private final Clock clock;
+
+ PhysicalSlotRequestBulkCheckerImpl(final Supplier<Set<SlotInfo>>
slotsRetriever, final Clock clock) {
+ this.slotsRetriever = checkNotNull(slotsRetriever);
+ this.clock = checkNotNull(clock);
+ }
+
+ void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+ this.componentMainThreadExecutor = mainThreadExecutor;
+ }
+
+ @Override
+ public void
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time
timeout) {
+ PhysicalSlotRequestBulkWithTimestamp bulkWithTimestamp = new
PhysicalSlotRequestBulkWithTimestamp(bulk);
+ bulkWithTimestamp.markUnfulfillable(clock.relativeTimeMillis());
+ schedulePendingRequestBulkTimeoutCheck(bulkWithTimestamp,
timeout);
+ }
+
+ private void
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulkWithTimestamp
bulk, Time timeout) {
+ componentMainThreadExecutor.schedule(() -> {
+ TimeoutCheckResult result =
checkPhysicalSlotRequestBulkTimeout(bulk, timeout);
+
+ switch (result) {
+ case PENDING:
+ //re-schedule the timeout check
+
schedulePendingRequestBulkTimeoutCheck(bulk, timeout);
+ break;
+ case TIMEOUT:
+ bulk.cancel(new TimeoutException("Slot
request bulk is not fulfillable!"));
+ break;
+ case FULFILLED:
+ default:
+ // no action to take
+ break;
+ }
+ }, timeout.getSize(), timeout.getUnit());
+ }
+
+ /**
+ * Check the slot request bulk and timeout its requests if it has been
unfulfillable for too long.
+ * @param slotRequestBulk bulk of slot requests
+ * @param slotRequestTimeout indicates how long a pending request can
be unfulfillable
+ * @return result of the check, indicating the bulk is fulfilled, still
pending, or timed out
+ */
+ @VisibleForTesting
+ TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
+ final PhysicalSlotRequestBulkWithTimestamp
slotRequestBulk,
+ final Time slotRequestTimeout) {
+
+ if (slotRequestBulk.getPendingRequests().isEmpty()) {
+ return TimeoutCheckResult.FULFILLED;
+ }
+
+ final boolean fulfillable =
isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever);
+ if (fulfillable) {
+ slotRequestBulk.markFulfillable();
+ } else {
+ final long currentTimestamp =
clock.relativeTimeMillis();
+
+ slotRequestBulk.markUnfulfillable(currentTimestamp);
+
+ final long unfulfillableSince =
slotRequestBulk.getUnfulfillableSince();
+ if (unfulfillableSince +
slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
+ return TimeoutCheckResult.TIMEOUT;
+ }
+ }
+
+ return TimeoutCheckResult.PENDING;
+ }
+
+ /**
+ * Returns whether the given bulk of slot requests are possible to be
fulfilled at the same time
+ * with all the reusable slots in the slot pool. A reusable slot means
the slot is available or
+ * will not be occupied indefinitely.
+ *
+ * @param slotRequestBulk bulk of slot requests to check
+ * @param slotsRetriever supplies slots to be used for the
fulfill-ability check
+ * @return true if the slot requests are possible to be fulfilled,
otherwise false
+ */
+ @VisibleForTesting
+ static boolean isSlotRequestBulkFulfillable(
+ final PhysicalSlotRequestBulk slotRequestBulk,
+ final Supplier<Set<SlotInfo>> slotsRetriever) {
+
+ final Set<AllocationID> assignedSlots =
slotRequestBulk.getAllocationIdsOfFulfilledRequests();
+ final Set<SlotInfo> reusableSlots =
getReusableSlots(slotsRetriever, assignedSlots);
+ return
areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests(),
reusableSlots);
+ }
+
+ private static Set<SlotInfo> getReusableSlots(
+ final Supplier<Set<SlotInfo>> slotsRetriever,
+ final Set<AllocationID> slotsToExclude) {
+
+ return slotsRetriever.get().stream()
+ .filter(slotInfo ->
!slotInfo.willBeOccupiedIndefinitely())
+ .filter(slotInfo ->
!slotsToExclude.contains(slotInfo.getAllocationId()))
+ .collect(Collectors.toSet());
+ }
+
+ private static boolean areRequestsFulfillableWithSlots(
+ final Collection<ResourceProfile>
requestResourceProfiles,
+ final Set<SlotInfo> slots) {
+
+ final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
+ for (ResourceProfile requestResourceProfile :
requestResourceProfiles) {
+ final Optional<SlotInfo> matchedSlot =
findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
+ if (matchedSlot.isPresent()) {
+ remainingSlots.remove(matchedSlot.get());
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static Optional<SlotInfo> findMatchingSlotForRequest(
+ final ResourceProfile requestResourceProfile,
+ final Collection<SlotInfo> slots) {
+
+ return slots.stream().filter(slot ->
slot.getResourceProfile().isMatching(requestResourceProfile)).findFirst();
+ }
+
+ static PhysicalSlotRequestBulkCheckerImpl fromSlotPool(final SlotPool
slotPool, final Clock clock) {
+ return new PhysicalSlotRequestBulkCheckerImpl(() ->
getAllSlotInfos(slotPool), clock);
+ }
+
+ private static Set<SlotInfo> getAllSlotInfos(SlotPool slotPool) {
+ return Stream
+ .concat(
+
slotPool.getAvailableSlotsInformation().stream(),
+
slotPool.getAllocatedSlotsInformation().stream())
+ .collect(Collectors.toSet());
Review comment:
I think `slotPool.getAllocatesSlotsInformation` should already give you
all slot infos.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharingPhysicalSlotRequestBulk.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SharingPhysicalSlotRequestBulk implements PhysicalSlotRequestBulk {
+ private final Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>>
executions;
+
+ private final Map<ExecutionSlotSharingGroup, ResourceProfile>
pendingRequests;
+
+ private final Map<ExecutionSlotSharingGroup, AllocationID>
fulfilledRequests;
+
+ private final BiConsumer<ExecutionVertexID, Throwable>
logicalSlotRequestCanceller;
+
+ SharingPhysicalSlotRequestBulk(
+ Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>>
executions,
+ Map<ExecutionSlotSharingGroup, ResourceProfile>
pendingRequests,
+ Map<ExecutionSlotSharingGroup, AllocationID>
fulfilledRequests,
+ BiConsumer<ExecutionVertexID, Throwable>
logicalSlotRequestCanceller) {
+ this.executions = checkNotNull(executions);
+ this.pendingRequests = checkNotNull(pendingRequests);
+ this.fulfilledRequests = checkNotNull(fulfilledRequests);
+ this.logicalSlotRequestCanceller =
checkNotNull(logicalSlotRequestCanceller);
+ }
+
+ @Override
+ public Collection<ResourceProfile> getPendingRequests() {
+ return pendingRequests.values();
+ }
+
+ @Override
+ public Set<AllocationID> getAllocationIdsOfFulfilledRequests() {
+ return new HashSet<>(fulfilledRequests.values());
+ }
+
+ @Override
+ public void cancel(Throwable cause) {
+ // pending requests must be canceled first otherwise they might
be fulfilled by
+ // allocated slots released from this bulk
+ Stream
+ .concat(
+ pendingRequests.keySet().stream(),
+ fulfilledRequests.keySet().stream())
+ .flatMap(group -> executions.get(group).stream())
+ .forEach(id -> logicalSlotRequestCanceller.accept(id,
cause));
Review comment:
Call me old fashioned but I like to follow the KISS principle and to use
the for-each loop here instead of wrapping everything in a stream and then
calling a non-pure function in `forEach`. I think the streams API has its value
in transforming streams via pure functions into something else but should not
be used when doing computations with side effects.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -113,27 +122,30 @@
SharedSlotProfileRetriever sharedSlotProfileRetriever =
sharedSlotProfileRetrieverFactory
.createFromBulk(new HashSet<>(executionVertexIds));
- Map<ExecutionVertexID, SlotExecutionVertexAssignment>
assignments = executionVertexIds
+ Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>>
executionsByGroup = executionVertexIds
.stream()
-
.collect(Collectors.groupingBy(slotSharingStrategy::getExecutionSlotSharingGroup))
+
.collect(Collectors.groupingBy(slotSharingStrategy::getExecutionSlotSharingGroup));
+ Map<ExecutionVertexID, SlotExecutionVertexAssignment>
assignments = executionsByGroup
.entrySet()
.stream()
.flatMap(entry ->
allocateLogicalSlotsFromSharedSlot(sharedSlotProfileRetriever, entry.getKey(),
entry.getValue()))
.collect(Collectors.toMap(SlotExecutionVertexAssignment::getExecutionVertexId,
a -> a));
Review comment:
I am not convinced that using the stream API here is the right choice.
What we are doing is to do computations with side effects. For that I would
always prefer a traditional for-each loop.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
##########
@@ -18,131 +18,24 @@
package org.apache.flink.runtime.jobmaster.slotpool;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.util.clock.Clock;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * This class helps to check the status of physical slot request bulks.
+ * This class tracks a fulfil-ability timeout of a bulk of physical slot
requests.
Review comment:
Nit: I would suggest to decide whether to write `fulfil` or `fulfill`
and to be consistent with spelling.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class PhysicalSlotRequestBulkCheckerImpl implements
PhysicalSlotRequestBulkChecker {
+
+ private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+ private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+ private final Clock clock;
+
+ PhysicalSlotRequestBulkCheckerImpl(final Supplier<Set<SlotInfo>>
slotsRetriever, final Clock clock) {
+ this.slotsRetriever = checkNotNull(slotsRetriever);
+ this.clock = checkNotNull(clock);
+ }
+
+ void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+ this.componentMainThreadExecutor = mainThreadExecutor;
+ }
+
+ @Override
+ public void
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time
timeout) {
+ PhysicalSlotRequestBulkWithTimestamp bulkWithTimestamp = new
PhysicalSlotRequestBulkWithTimestamp(bulk);
+ bulkWithTimestamp.markUnfulfillable(clock.relativeTimeMillis());
+ schedulePendingRequestBulkTimeoutCheck(bulkWithTimestamp,
timeout);
+ }
+
+ private void
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulkWithTimestamp
bulk, Time timeout) {
+ componentMainThreadExecutor.schedule(() -> {
+ TimeoutCheckResult result =
checkPhysicalSlotRequestBulkTimeout(bulk, timeout);
+
+ switch (result) {
+ case PENDING:
+ //re-schedule the timeout check
+
schedulePendingRequestBulkTimeoutCheck(bulk, timeout);
+ break;
+ case TIMEOUT:
+ bulk.cancel(new TimeoutException("Slot
request bulk is not fulfillable!"));
+ break;
+ case FULFILLED:
+ default:
+ // no action to take
+ break;
+ }
+ }, timeout.getSize(), timeout.getUnit());
+ }
+
+ /**
+ * Check the slot request bulk and timeout its requests if it has been
unfulfillable for too long.
+ * @param slotRequestBulk bulk of slot requests
+ * @param slotRequestTimeout indicates how long a pending request can
be unfulfillable
+ * @return result of the check, indicating the bulk is fulfilled, still
pending, or timed out
+ */
+ @VisibleForTesting
+ TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
+ final PhysicalSlotRequestBulkWithTimestamp
slotRequestBulk,
+ final Time slotRequestTimeout) {
+
+ if (slotRequestBulk.getPendingRequests().isEmpty()) {
+ return TimeoutCheckResult.FULFILLED;
+ }
+
+ final boolean fulfillable =
isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever);
+ if (fulfillable) {
+ slotRequestBulk.markFulfillable();
+ } else {
+ final long currentTimestamp =
clock.relativeTimeMillis();
+
+ slotRequestBulk.markUnfulfillable(currentTimestamp);
+
+ final long unfulfillableSince =
slotRequestBulk.getUnfulfillableSince();
+ if (unfulfillableSince +
slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
+ return TimeoutCheckResult.TIMEOUT;
+ }
+ }
+
+ return TimeoutCheckResult.PENDING;
+ }
+
+ /**
+ * Returns whether the given bulk of slot requests are possible to be
fulfilled at the same time
+ * with all the reusable slots in the slot pool. A reusable slot means
the slot is available or
+ * will not be occupied indefinitely.
+ *
+ * @param slotRequestBulk bulk of slot requests to check
+ * @param slotsRetriever supplies slots to be used for the
fulfill-ability check
+ * @return true if the slot requests are possible to be fulfilled,
otherwise false
+ */
+ @VisibleForTesting
+ static boolean isSlotRequestBulkFulfillable(
+ final PhysicalSlotRequestBulk slotRequestBulk,
+ final Supplier<Set<SlotInfo>> slotsRetriever) {
+
+ final Set<AllocationID> assignedSlots =
slotRequestBulk.getAllocationIdsOfFulfilledRequests();
+ final Set<SlotInfo> reusableSlots =
getReusableSlots(slotsRetriever, assignedSlots);
+ return
areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests(),
reusableSlots);
+ }
+
+ private static Set<SlotInfo> getReusableSlots(
+ final Supplier<Set<SlotInfo>> slotsRetriever,
+ final Set<AllocationID> slotsToExclude) {
+
+ return slotsRetriever.get().stream()
+ .filter(slotInfo ->
!slotInfo.willBeOccupiedIndefinitely())
+ .filter(slotInfo ->
!slotsToExclude.contains(slotInfo.getAllocationId()))
+ .collect(Collectors.toSet());
+ }
+
+ private static boolean areRequestsFulfillableWithSlots(
+ final Collection<ResourceProfile>
requestResourceProfiles,
+ final Set<SlotInfo> slots) {
+
+ final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
+ for (ResourceProfile requestResourceProfile :
requestResourceProfiles) {
+ final Optional<SlotInfo> matchedSlot =
findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
+ if (matchedSlot.isPresent()) {
+ remainingSlots.remove(matchedSlot.get());
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static Optional<SlotInfo> findMatchingSlotForRequest(
+ final ResourceProfile requestResourceProfile,
+ final Collection<SlotInfo> slots) {
+
+ return slots.stream().filter(slot ->
slot.getResourceProfile().isMatching(requestResourceProfile)).findFirst();
+ }
+
+ static PhysicalSlotRequestBulkCheckerImpl fromSlotPool(final SlotPool
slotPool, final Clock clock) {
Review comment:
```suggestion
static PhysicalSlotRequestBulkCheckerImpl create(final SlotPool
slotPool, final Clock clock) {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class PhysicalSlotRequestBulkCheckerImpl implements
PhysicalSlotRequestBulkChecker {
+
+ private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+ private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+ private final Clock clock;
+
+ PhysicalSlotRequestBulkCheckerImpl(final Supplier<Set<SlotInfo>>
slotsRetriever, final Clock clock) {
+ this.slotsRetriever = checkNotNull(slotsRetriever);
+ this.clock = checkNotNull(clock);
+ }
+
+ void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+ this.componentMainThreadExecutor = mainThreadExecutor;
+ }
+
+ @Override
+ public void
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time
timeout) {
+ PhysicalSlotRequestBulkWithTimestamp bulkWithTimestamp = new
PhysicalSlotRequestBulkWithTimestamp(bulk);
+ bulkWithTimestamp.markUnfulfillable(clock.relativeTimeMillis());
+ schedulePendingRequestBulkTimeoutCheck(bulkWithTimestamp,
timeout);
+ }
+
+ private void
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulkWithTimestamp
bulk, Time timeout) {
+ componentMainThreadExecutor.schedule(() -> {
+ TimeoutCheckResult result =
checkPhysicalSlotRequestBulkTimeout(bulk, timeout);
+
+ switch (result) {
+ case PENDING:
+ //re-schedule the timeout check
+
schedulePendingRequestBulkTimeoutCheck(bulk, timeout);
+ break;
+ case TIMEOUT:
+ bulk.cancel(new TimeoutException("Slot
request bulk is not fulfillable!"));
+ break;
+ case FULFILLED:
+ default:
+ // no action to take
+ break;
+ }
+ }, timeout.getSize(), timeout.getUnit());
+ }
+
+ /**
+ * Check the slot request bulk and timeout its requests if it has been
unfulfillable for too long.
+ * @param slotRequestBulk bulk of slot requests
+ * @param slotRequestTimeout indicates how long a pending request can
be unfulfillable
+ * @return result of the check, indicating the bulk is fulfilled, still
pending, or timed out
+ */
+ @VisibleForTesting
+ TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
+ final PhysicalSlotRequestBulkWithTimestamp
slotRequestBulk,
+ final Time slotRequestTimeout) {
+
+ if (slotRequestBulk.getPendingRequests().isEmpty()) {
+ return TimeoutCheckResult.FULFILLED;
+ }
+
+ final boolean fulfillable =
isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever);
+ if (fulfillable) {
+ slotRequestBulk.markFulfillable();
+ } else {
+ final long currentTimestamp =
clock.relativeTimeMillis();
+
+ slotRequestBulk.markUnfulfillable(currentTimestamp);
+
+ final long unfulfillableSince =
slotRequestBulk.getUnfulfillableSince();
+ if (unfulfillableSince +
slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
+ return TimeoutCheckResult.TIMEOUT;
+ }
+ }
+
+ return TimeoutCheckResult.PENDING;
+ }
+
+ /**
+ * Returns whether the given bulk of slot requests are possible to be
fulfilled at the same time
+ * with all the reusable slots in the slot pool. A reusable slot means
the slot is available or
+ * will not be occupied indefinitely.
+ *
+ * @param slotRequestBulk bulk of slot requests to check
+ * @param slotsRetriever supplies slots to be used for the
fulfill-ability check
+ * @return true if the slot requests are possible to be fulfilled,
otherwise false
+ */
+ @VisibleForTesting
+ static boolean isSlotRequestBulkFulfillable(
+ final PhysicalSlotRequestBulk slotRequestBulk,
+ final Supplier<Set<SlotInfo>> slotsRetriever) {
+
+ final Set<AllocationID> assignedSlots =
slotRequestBulk.getAllocationIdsOfFulfilledRequests();
+ final Set<SlotInfo> reusableSlots =
getReusableSlots(slotsRetriever, assignedSlots);
+ return
areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests(),
reusableSlots);
+ }
+
+ private static Set<SlotInfo> getReusableSlots(
+ final Supplier<Set<SlotInfo>> slotsRetriever,
+ final Set<AllocationID> slotsToExclude) {
+
+ return slotsRetriever.get().stream()
+ .filter(slotInfo ->
!slotInfo.willBeOccupiedIndefinitely())
+ .filter(slotInfo ->
!slotsToExclude.contains(slotInfo.getAllocationId()))
+ .collect(Collectors.toSet());
+ }
+
+ private static boolean areRequestsFulfillableWithSlots(
+ final Collection<ResourceProfile>
requestResourceProfiles,
+ final Set<SlotInfo> slots) {
+
+ final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
+ for (ResourceProfile requestResourceProfile :
requestResourceProfiles) {
+ final Optional<SlotInfo> matchedSlot =
findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
+ if (matchedSlot.isPresent()) {
+ remainingSlots.remove(matchedSlot.get());
+ } else {
+ return false;
+ }
+ }
Review comment:
We don't have to address it right away but we should keep in mind that
this might have a quadratic complexity.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -98,14 +101,15 @@ public SchedulerImpl(
"Scheduler is not initialized with proper main thread
executor. " +
"Call to Scheduler.start(...) required.");
- this.bulkSlotProvider = new
BulkSlotProviderImpl(slotSelectionStrategy, slotPool);
+ this.slotRequestBulkChecker =
PhysicalSlotRequestBulkCheckerImpl.fromSlotPool(slotPool,
SystemClock.getInstance());
+ this.bulkSlotProvider = new
BulkSlotProviderImpl(slotSelectionStrategy, slotPool, slotRequestBulkChecker);
Review comment:
Looking at the code, wouldn't the change be quite small? If I am not
mistaken, then it would be enough to change the `schedulerFactory` argument of
the `JobMaster` into a `SlotProviderFactory`. That way we should be able to
provide a different `SlotProvider` implementation than `SchedulerImpl` to the
`JobMaster` and we would not have to make sure that the `SchedulerImpl`
implements all methods of the `SlotProvider` interface.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImplTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import static
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.createPhysicalSlot;
+import static
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.occupyPhysicalSlot;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link PhysicalSlotRequestBulkCheckerImpl}.
+ */
+public class PhysicalSlotRequestBulkCheckerImplTest extends TestLogger {
+
+ private static final Time TIMEOUT = Time.milliseconds(100L);
Review comment:
Could this also be 50ms?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkWithTimestamp.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import java.util.Collection;
+import java.util.Set;
+
+class PhysicalSlotRequestBulkWithTimestamp implements PhysicalSlotRequestBulk {
Review comment:
I think this class is being used in previous commits. Hence, the order
in which the commits are split up does not work.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharingPhysicalSlotRequestBulk.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SharingPhysicalSlotRequestBulk implements PhysicalSlotRequestBulk {
+ private final Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>>
executions;
+
+ private final Map<ExecutionSlotSharingGroup, ResourceProfile>
pendingRequests;
+
+ private final Map<ExecutionSlotSharingGroup, AllocationID>
fulfilledRequests;
+
+ private final BiConsumer<ExecutionVertexID, Throwable>
logicalSlotRequestCanceller;
+
+ SharingPhysicalSlotRequestBulk(
+ Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>>
executions,
+ Map<ExecutionSlotSharingGroup, ResourceProfile>
pendingRequests,
+ Map<ExecutionSlotSharingGroup, AllocationID>
fulfilledRequests,
+ BiConsumer<ExecutionVertexID, Throwable>
logicalSlotRequestCanceller) {
+ this.executions = checkNotNull(executions);
+ this.pendingRequests = checkNotNull(pendingRequests);
+ this.fulfilledRequests = checkNotNull(fulfilledRequests);
+ this.logicalSlotRequestCanceller =
checkNotNull(logicalSlotRequestCanceller);
+ }
+
+ @Override
+ public Collection<ResourceProfile> getPendingRequests() {
+ return pendingRequests.values();
+ }
+
+ @Override
+ public Set<AllocationID> getAllocationIdsOfFulfilledRequests() {
+ return new HashSet<>(fulfilledRequests.values());
Review comment:
`Collections.unmodifiableSet` might be a bit cheaper here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.util.DualKeyLinkedMap;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
Review comment:
`JavaDocs` would be good here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImplTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import static
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.createPhysicalSlot;
+import static
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.occupyPhysicalSlot;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link PhysicalSlotRequestBulkCheckerImpl}.
+ */
+public class PhysicalSlotRequestBulkCheckerImplTest extends TestLogger {
+
+ private static final Time TIMEOUT = Time.milliseconds(100L);
+
+ private static ScheduledExecutorService
singleThreadScheduledExecutorService;
+
+ private static ComponentMainThreadExecutor mainThreadExecutor;
+
+ private final ManualClock clock = new ManualClock();
+
+ private PhysicalSlotRequestBulkCheckerImpl bulkChecker;
+
+ private Set<PhysicalSlot> slots;
+
+ private Supplier<Set<SlotInfo>> slotsRetriever;
+
+ @BeforeClass
+ public static void setupClass() {
+ singleThreadScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
+ mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
+ }
+
+ @AfterClass
+ public static void teardownClass() {
+ if (singleThreadScheduledExecutorService != null) {
+ singleThreadScheduledExecutorService.shutdownNow();
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ slots = new HashSet<>();
+ slotsRetriever = () -> new HashSet<>(slots);
+ bulkChecker = new
PhysicalSlotRequestBulkCheckerImpl(slotsRetriever, clock);
+ bulkChecker.start(mainThreadExecutor);
+ }
+
+ @Test
+ public void testPendingBulkIsNotCancelled() throws
InterruptedException, ExecutionException {
+ final CompletableFuture<SlotRequestId> cancellationFuture = new
CompletableFuture<>();
+ final PhysicalSlotRequestBulk bulk =
createPhysicalSlotRequestBulkWithCancellationFuture(cancellationFuture, new
SlotRequestId());
+ bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk,
TIMEOUT);
+ checkNotCancelledAfter(cancellationFuture, 2 *
TIMEOUT.toMilliseconds());
+ }
+
+ @Test
+ public void testFulfilledBulkIsNotCancelled() throws
InterruptedException, ExecutionException {
+ final CompletableFuture<SlotRequestId> cancellationFuture = new
CompletableFuture<>();
+ final PhysicalSlotRequestBulk bulk =
createPhysicalSlotRequestBulkWithCancellationFuture(cancellationFuture, new
SlotRequestId());
+ bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk,
TIMEOUT);
+ checkNotCancelledAfter(cancellationFuture, 2 *
TIMEOUT.toMilliseconds());
+ }
+
+ private static void checkNotCancelledAfter(
+ CompletableFuture<?> cancellationFuture, long milli)
throws ExecutionException, InterruptedException {
+ mainThreadExecutor.schedule(() -> {}, milli,
TimeUnit.MILLISECONDS).get();
+ try {
+ assertThat(cancellationFuture.isDone(), is(false));
+ cancellationFuture.get(milli, TimeUnit.MILLISECONDS);
+ fail("The future must not have been cancelled");
+ } catch (TimeoutException e) {
+ assertThat(cancellationFuture.isDone(), is(false));
+ }
+ }
+
+ @Test
+ public void testUnfulfillableBulkIsCancelled() {
+ final CompletableFuture<SlotRequestId> cancellationFuture = new
CompletableFuture<>();
+ final SlotRequestId slotRequestId = new SlotRequestId();
+ final PhysicalSlotRequestBulk bulk =
createPhysicalSlotRequestBulkWithCancellationFuture(cancellationFuture,
slotRequestId);
+ bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk,
TIMEOUT);
+ clock.advanceTime(TIMEOUT.toMilliseconds() + 1L,
TimeUnit.MILLISECONDS);
+ assertThat(cancellationFuture.join(), is(slotRequestId));
+ }
+
+ @Test
+ public void testBulkFulfilledOnCheck() {
+ final SlotRequestId slotRequestId = new SlotRequestId();
+ final PhysicalSlotRequestBulkImpl bulk =
createPhysicalSlotRequestBulk(slotRequestId);
+
+ bulk.markRequestFulfilled(slotRequestId, new AllocationID());
+
+ final PhysicalSlotRequestBulkWithTimestamp bulkWithTimestamp =
new PhysicalSlotRequestBulkWithTimestamp(bulk);
+ assertThat(checkBulkTimeout(bulkWithTimestamp),
is(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.FULFILLED));
+ }
+
+ @Test
+ public void testBulkTimeoutOnCheck() {
+ final PhysicalSlotRequestBulkWithTimestamp bulk =
createPhysicalSlotRequestBulkWithTimestamp(new SlotRequestId());
+
+ clock.advanceTime(TIMEOUT.toMilliseconds() + 1L,
TimeUnit.MILLISECONDS);
+ assertThat(checkBulkTimeout(bulk),
is(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.TIMEOUT));
+ }
+
+ @Test
+ public void testBulkPendingOnCheckIfFulfillable() {
+ final PhysicalSlotRequestBulkWithTimestamp bulk =
createPhysicalSlotRequestBulkWithTimestamp(new SlotRequestId());
+
+ final PhysicalSlot slot = addOneSlot();
+ occupyPhysicalSlot(slot, false);
+
+ assertThat(checkBulkTimeout(bulk),
is(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.PENDING));
+ }
+
+ @Test
+ public void testBulkPendingOnCheckIfUnfulfillableButNotTimedOut() {
+ final PhysicalSlotRequestBulkWithTimestamp bulk =
createPhysicalSlotRequestBulkWithTimestamp(new SlotRequestId());
+
+ assertThat(checkBulkTimeout(bulk),
is(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.PENDING));
+ }
+
+ @Test
+ public void testBulkFulfillable() {
+ final PhysicalSlotRequestBulk bulk =
createPhysicalSlotRequestBulk(new SlotRequestId());
+
+ addOneSlot();
+
+ assertThat(isFulfillable(bulk), is(true));
+ }
+
+ @Test
+ public void testBulkUnfulfillableWithInsufficientSlots() {
+ final PhysicalSlotRequestBulk bulk =
createPhysicalSlotRequestBulk(new SlotRequestId(), new SlotRequestId());
+
+ addOneSlot();
+
+ assertThat(isFulfillable(bulk), is(false));
+ }
+
+ @Test
+ public void testBulkUnfulfillableWithSlotAlreadyAssignedToBulk() {
+ final SlotRequestId slotRequestId = new SlotRequestId();
+ final PhysicalSlotRequestBulkImpl bulk =
createPhysicalSlotRequestBulk(slotRequestId, new SlotRequestId());
+
+ final PhysicalSlot slot = addOneSlot();
+
+ bulk.markRequestFulfilled(slotRequestId,
slot.getAllocationId());
+
+ assertThat(isFulfillable(bulk), is(false));
+ }
+
+ @Test
+ public void testBulkUnfulfillableWithSlotOccupiedIndefinitely() {
+ final PhysicalSlotRequestBulk bulk =
createPhysicalSlotRequestBulk(new SlotRequestId(), new SlotRequestId());
+
+ final PhysicalSlot slot1 = addOneSlot();
+ addOneSlot();
+
+ occupyPhysicalSlot(slot1, true);
+
+ assertThat(isFulfillable(bulk), is(false));
+ }
+
+ @Test
+ public void testBulkFulfillableWithSlotOccupiedTemporarily() {
+ final PhysicalSlotRequestBulk bulk =
createPhysicalSlotRequestBulk(new SlotRequestId(), new SlotRequestId());
+
+ final PhysicalSlot slot1 = addOneSlot();
+ addOneSlot();
+
+ occupyPhysicalSlot(slot1, false);
+
+ assertThat(isFulfillable(bulk), is(true));
+ }
+
+ private PhysicalSlotRequestBulkWithTimestamp
createPhysicalSlotRequestBulkWithTimestamp(SlotRequestId... slotRequestIds) {
+ final PhysicalSlotRequestBulkWithTimestamp bulk = new
PhysicalSlotRequestBulkWithTimestamp(createPhysicalSlotRequestBulk(slotRequestIds));
+ bulk.markUnfulfillable(clock.relativeTimeMillis());
+ return bulk;
+ }
+
+ private static PhysicalSlotRequestBulkImpl
createPhysicalSlotRequestBulk(SlotRequestId... slotRequestIds) {
+ final TestingPhysicalSlotRequestBulkBuilder builder =
TestingPhysicalSlotRequestBulkBuilder.newBuilder();
+ for (SlotRequestId slotRequestId : slotRequestIds) {
+ builder.addPendingRequest(slotRequestId,
ResourceProfile.UNKNOWN);
+ }
+ return builder.buildPhysicalSlotRequestBulkImpl();
+ }
+
+ private PhysicalSlotRequestBulk
createPhysicalSlotRequestBulkWithCancellationFuture(
+ CompletableFuture<SlotRequestId> cancellationFuture,
+ SlotRequestId slotRequestId) {
+ return TestingPhysicalSlotRequestBulkBuilder
+ .newBuilder()
+ .addPendingRequest(slotRequestId,
ResourceProfile.UNKNOWN)
+ .setCanceller((id, t) ->
cancellationFuture.complete(id))
+ .buildPhysicalSlotRequestBulkImpl();
+ }
+
+ private PhysicalSlot addOneSlot() {
+ final PhysicalSlot slot = createPhysicalSlot();
+ slots.add(slot);
Review comment:
I think it is not guaranteed that changes to `slot` are visible within
the `bulkChecker` which uses the `mainThreadExecutor` to run since the used
`Set` implementation is not thread safe.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.util.DualKeyLinkedMap;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
+ private static final Logger LOG =
LoggerFactory.getLogger(SharedSlot.class);
+
+ private final SlotRequestId physicalSlotRequestId;
+
+ private final ResourceProfile physicalSlotResourceProfile;
+
+ private final ExecutionSlotSharingGroup executionSlotSharingGroup;
+
+ private final CompletableFuture<PhysicalSlot> slotContextFuture;
+
+ private final DualKeyLinkedMap<ExecutionVertexID, SlotRequestId,
CompletableFuture<SingleLogicalSlot>> requestedLogicalSlots;
+
+ private final boolean slotWillBeOccupiedIndefinitely;
+
+ private final Consumer<ExecutionSlotSharingGroup> releaseCallback;
+
+ SharedSlot(
+ SlotRequestId physicalSlotRequestId,
+ ResourceProfile physicalSlotResourceProfile,
+ ExecutionSlotSharingGroup executionSlotSharingGroup,
+ CompletableFuture<PhysicalSlot> slotContextFuture,
+ boolean slotWillBeOccupiedIndefinitely,
+ Consumer<ExecutionSlotSharingGroup> releaseCallback) {
+ this.physicalSlotRequestId = physicalSlotRequestId;
+ this.physicalSlotResourceProfile = physicalSlotResourceProfile;
+ this.executionSlotSharingGroup = executionSlotSharingGroup;
+ this.slotContextFuture =
slotContextFuture.thenApply(physicalSlot -> {
+ Preconditions.checkState(
+ physicalSlot.tryAssignPayload(this),
+ "Unexpected physical slot payload assignment
failure!");
+ return physicalSlot;
+ });
+ this.requestedLogicalSlots = new
DualKeyLinkedMap<>(executionSlotSharingGroup.getExecutionVertexIds().size());
+ this.slotWillBeOccupiedIndefinitely =
slotWillBeOccupiedIndefinitely;
+ this.releaseCallback = releaseCallback;
+ }
+
+ SlotRequestId getPhysicalSlotRequestId() {
+ return physicalSlotRequestId;
+ }
+
+ ResourceProfile getPhysicalSlotResourceProfile() {
+ return physicalSlotResourceProfile;
+ }
+
+ CompletableFuture<PhysicalSlot> getSlotContextFuture() {
+ return slotContextFuture;
+ }
+
+ CompletableFuture<LogicalSlot> allocateLogicalSlot(ExecutionVertexID
executionVertexId) {
+
Preconditions.checkArgument(executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexId));
+ CompletableFuture<SingleLogicalSlot> logicalSlotFuture =
requestedLogicalSlots.getValueByKeyA(executionVertexId);
+ if (logicalSlotFuture != null) {
+ LOG.debug("Request for {} already exists",
getLogicalSlotString(executionVertexId));
+ } else {
+ logicalSlotFuture =
allocateNonExistentLogicalSlot(executionVertexId);
+ }
+ return logicalSlotFuture.thenApply(Function.identity());
+ }
+
+ private CompletableFuture<SingleLogicalSlot>
allocateNonExistentLogicalSlot(ExecutionVertexID executionVertexId) {
+ CompletableFuture<SingleLogicalSlot> logicalSlotFuture;
+ SlotRequestId logicalSlotRequestId = new SlotRequestId();
+ String logMessageBase =
getLogicalSlotString(logicalSlotRequestId, executionVertexId);
+ LOG.debug("Request a {}", logMessageBase);
+
+ logicalSlotFuture = slotContextFuture
+ .thenApply(physicalSlot -> {
+ LOG.debug("Allocated {}", logMessageBase);
+ return createLogicalSlot(physicalSlot,
logicalSlotRequestId);
+ });
+ requestedLogicalSlots.put(executionVertexId,
logicalSlotRequestId, logicalSlotFuture);
+
+ // If the physical slot request fails (slotContextFuture), it
will also fail the logicalSlotFuture.
+ // Therefore, the next `exceptionally` callback will
cancelLogicalSlotRequest and do the cleanup
Review comment:
`exceptionally` calls `removeLogicalSlotRequest` and not
`cancelLogicalSlotRequest`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharingPhysicalSlotRequestBulk.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SharingPhysicalSlotRequestBulk implements PhysicalSlotRequestBulk {
+ private final Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>>
executions;
+
+ private final Map<ExecutionSlotSharingGroup, ResourceProfile>
pendingRequests;
+
+ private final Map<ExecutionSlotSharingGroup, AllocationID>
fulfilledRequests;
+
+ private final BiConsumer<ExecutionVertexID, Throwable>
logicalSlotRequestCanceller;
+
+ SharingPhysicalSlotRequestBulk(
+ Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>>
executions,
+ Map<ExecutionSlotSharingGroup, ResourceProfile>
pendingRequests,
+ Map<ExecutionSlotSharingGroup, AllocationID>
fulfilledRequests,
+ BiConsumer<ExecutionVertexID, Throwable>
logicalSlotRequestCanceller) {
+ this.executions = checkNotNull(executions);
+ this.pendingRequests = checkNotNull(pendingRequests);
+ this.fulfilledRequests = checkNotNull(fulfilledRequests);
+ this.logicalSlotRequestCanceller =
checkNotNull(logicalSlotRequestCanceller);
+ }
+
+ @Override
+ public Collection<ResourceProfile> getPendingRequests() {
+ return pendingRequests.values();
+ }
+
+ @Override
+ public Set<AllocationID> getAllocationIdsOfFulfilledRequests() {
+ return new HashSet<>(fulfilledRequests.values());
+ }
+
+ @Override
+ public void cancel(Throwable cause) {
+ // pending requests must be canceled first otherwise they might
be fulfilled by
+ // allocated slots released from this bulk
+ Stream
+ .concat(
+ pendingRequests.keySet().stream(),
+ fulfilledRequests.keySet().stream())
+ .flatMap(group -> executions.get(group).stream())
+ .forEach(id -> logicalSlotRequestCanceller.accept(id,
cause));
+ }
+
+ void markFulfilled(ExecutionSlotSharingGroup group, AllocationID
allocationID) {
+ pendingRequests.remove(group);
+ fulfilledRequests.put(group, allocationID);
+ }
+
+ void clear() {
+ pendingRequests.clear();
Review comment:
Why do we only clear the `pendingRequests` and not also the fulfilled
slots? Maybe state the contract of this method in the `JavaDocs`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -161,158 +173,66 @@ private SharedSlot getOrAllocateSharedSlot(
.thenCompose(slotProfile ->
slotProvider.allocatePhysicalSlot(
new
PhysicalSlotRequest(physicalSlotRequestId, slotProfile,
slotWillBeOccupiedIndefinitely)))
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
- return new SharedSlot(physicalSlotRequestId,
physicalSlotResourceProfile, group, physicalSlotFuture);
+ return new SharedSlot(
+ physicalSlotRequestId,
+ physicalSlotResourceProfile,
+ group,
+ physicalSlotFuture,
+ slotWillBeOccupiedIndefinitely,
+ this::releaseSharedSlot);
});
}
+ private void releaseSharedSlot(ExecutionSlotSharingGroup
executionSlotSharingGroup) {
+ SharedSlot slot = sharedSlots.remove(executionSlotSharingGroup);
+ if (slot != null) {
+ slotProvider.cancelSlotRequest(
+ slot.getPhysicalSlotRequestId(),
+ new FlinkException("Slot is being returned from
SlotSharingExecutionSlotAllocator."));
+ } else {
+ LOG.debug("There is no slot for
ExecutionSlotSharingGroup {} to release", executionSlotSharingGroup);
+ }
+ }
+
private ResourceProfile
getPhysicalSlotResourceProfile(ExecutionSlotSharingGroup
executionSlotSharingGroup) {
return executionSlotSharingGroup
.getExecutionVertexIds()
.stream()
.reduce(ResourceProfile.ZERO, (r, e) ->
r.merge(resourceProfileRetriever.apply(e)), ResourceProfile::merge);
}
- private class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
- private final SlotRequestId physicalSlotRequestId;
-
- private final ResourceProfile physicalSlotResourceProfile;
-
- private final ExecutionSlotSharingGroup
executionSlotSharingGroup;
-
- private final CompletableFuture<PhysicalSlot> slotContextFuture;
-
- private final DualKeyLinkedMap<ExecutionVertexID,
SlotRequestId, CompletableFuture<SingleLogicalSlot>> requestedLogicalSlots;
-
- private SharedSlot(
- SlotRequestId physicalSlotRequestId,
- ResourceProfile physicalSlotResourceProfile,
- ExecutionSlotSharingGroup
executionSlotSharingGroup,
- CompletableFuture<PhysicalSlot>
slotContextFuture) {
- this.physicalSlotRequestId = physicalSlotRequestId;
- this.physicalSlotResourceProfile =
physicalSlotResourceProfile;
- this.executionSlotSharingGroup =
executionSlotSharingGroup;
- this.slotContextFuture =
slotContextFuture.thenApply(physicalSlot -> {
- Preconditions.checkState(
- physicalSlot.tryAssignPayload(this),
- "Unexpected physical slot payload
assignment failure!");
- return physicalSlot;
+ private SharingPhysicalSlotRequestBulk
createBulk(Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) {
+ Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests
= executions
+ .keySet()
+ .stream()
+ .collect(Collectors.toMap(
+ group -> group,
+ group ->
sharedSlots.get(group).getPhysicalSlotResourceProfile()
+ ));
+ Map<ExecutionSlotSharingGroup, AllocationID> fulfilledRequests
= new HashMap<>();
+ SharingPhysicalSlotRequestBulk bulk = new
SharingPhysicalSlotRequestBulk(
+ executions,
+ pendingRequests,
+ fulfilledRequests,
Review comment:
Why are we passing in an empty map here?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
##########
@@ -223,34 +232,52 @@ public void
testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() {
});
}
- private static void testLogicalSlotRequestCancellation(
+ @Test
+ public void
testCompletedLogicalSlotCancelationDoesNotCancelPhysicalSlotRequestAndDoesNotRemoveSharedSlot()
{
+ // physical slot request is completed and completes logical
requests
+ testLogicalSlotRequestCancellationOrRelease(
+ false,
+ false,
+ (context, assignment) -> {
+
context.getAllocator().cancel(assignment.getExecutionVertexId());
+ try {
+ assignment.getLogicalSlotFuture().get();
+ } catch (InterruptedException |
ExecutionException e) {
+ throw new
FlinkRuntimeException("Unexpected", e);
+ }
+ });
+ }
+
+ private static void testLogicalSlotRequestCancellationOrRelease(
boolean completePhysicalSlotFutureManually,
- BiConsumer<AllocationContext,
SlotExecutionVertexAssignment> cancelAction) {
+ boolean cancelsPhysicalSlotRequestAndRemovesSharedSlot,
+ BiConsumer<AllocationContext,
SlotExecutionVertexAssignment> cancelOrReleaseAction) {
Review comment:
Maybe make this a `BiConsumerWithException`, then we don't need the
try-catch blocks.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
##########
@@ -87,9 +86,12 @@
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
boolean willSlotBeOccupiedIndefinitely) {
- return willSlotBeOccupiedIndefinitely ?
- slotPool.requestNewAllocatedSlot(slotRequestId,
resourceProfile, null) :
- slotPool.requestNewAllocatedBatchSlot(slotRequestId,
resourceProfile);
+ if (willSlotBeOccupiedIndefinitely) {
+ return slotPool.requestNewAllocatedSlot(slotRequestId,
resourceProfile, null);
+ } else {
+ slotPool.disableBatchSlotRequestTimeoutCheck();
Review comment:
Hmm, one way to solve this problem is to not use this class in the
`SchedulerImpl`, right?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharingPhysicalSlotRequestBulk.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SharingPhysicalSlotRequestBulk implements PhysicalSlotRequestBulk {
Review comment:
This class is lacking tests.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -161,158 +173,66 @@ private SharedSlot getOrAllocateSharedSlot(
.thenCompose(slotProfile ->
slotProvider.allocatePhysicalSlot(
new
PhysicalSlotRequest(physicalSlotRequestId, slotProfile,
slotWillBeOccupiedIndefinitely)))
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
- return new SharedSlot(physicalSlotRequestId,
physicalSlotResourceProfile, group, physicalSlotFuture);
+ return new SharedSlot(
+ physicalSlotRequestId,
+ physicalSlotResourceProfile,
+ group,
+ physicalSlotFuture,
+ slotWillBeOccupiedIndefinitely,
+ this::releaseSharedSlot);
});
}
+ private void releaseSharedSlot(ExecutionSlotSharingGroup
executionSlotSharingGroup) {
+ SharedSlot slot = sharedSlots.remove(executionSlotSharingGroup);
+ if (slot != null) {
+ slotProvider.cancelSlotRequest(
+ slot.getPhysicalSlotRequestId(),
+ new FlinkException("Slot is being returned from
SlotSharingExecutionSlotAllocator."));
+ } else {
+ LOG.debug("There is no slot for
ExecutionSlotSharingGroup {} to release", executionSlotSharingGroup);
Review comment:
What are the conditions under which this branch can happen? Can't we say
that there must be a `SharedSlot` when this method is called? Making this
stricter could allow us to catch other programming errors faster.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.util.DualKeyLinkedMap;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
Review comment:
This class is lacking tests.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.util.DualKeyLinkedMap;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
+ private static final Logger LOG =
LoggerFactory.getLogger(SharedSlot.class);
+
+ private final SlotRequestId physicalSlotRequestId;
+
+ private final ResourceProfile physicalSlotResourceProfile;
+
+ private final ExecutionSlotSharingGroup executionSlotSharingGroup;
+
+ private final CompletableFuture<PhysicalSlot> slotContextFuture;
+
+ private final DualKeyLinkedMap<ExecutionVertexID, SlotRequestId,
CompletableFuture<SingleLogicalSlot>> requestedLogicalSlots;
+
+ private final boolean slotWillBeOccupiedIndefinitely;
+
+ private final Consumer<ExecutionSlotSharingGroup> releaseCallback;
+
+ SharedSlot(
+ SlotRequestId physicalSlotRequestId,
+ ResourceProfile physicalSlotResourceProfile,
+ ExecutionSlotSharingGroup executionSlotSharingGroup,
+ CompletableFuture<PhysicalSlot> slotContextFuture,
+ boolean slotWillBeOccupiedIndefinitely,
+ Consumer<ExecutionSlotSharingGroup> releaseCallback) {
+ this.physicalSlotRequestId = physicalSlotRequestId;
+ this.physicalSlotResourceProfile = physicalSlotResourceProfile;
+ this.executionSlotSharingGroup = executionSlotSharingGroup;
+ this.slotContextFuture =
slotContextFuture.thenApply(physicalSlot -> {
+ Preconditions.checkState(
+ physicalSlot.tryAssignPayload(this),
+ "Unexpected physical slot payload assignment
failure!");
+ return physicalSlot;
+ });
+ this.requestedLogicalSlots = new
DualKeyLinkedMap<>(executionSlotSharingGroup.getExecutionVertexIds().size());
+ this.slotWillBeOccupiedIndefinitely =
slotWillBeOccupiedIndefinitely;
+ this.releaseCallback = releaseCallback;
+ }
+
+ SlotRequestId getPhysicalSlotRequestId() {
+ return physicalSlotRequestId;
+ }
+
+ ResourceProfile getPhysicalSlotResourceProfile() {
+ return physicalSlotResourceProfile;
+ }
+
+ CompletableFuture<PhysicalSlot> getSlotContextFuture() {
+ return slotContextFuture;
+ }
+
+ CompletableFuture<LogicalSlot> allocateLogicalSlot(ExecutionVertexID
executionVertexId) {
+
Preconditions.checkArgument(executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexId));
Review comment:
An error message could be helpful.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharingPhysicalSlotRequestBulk.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SharingPhysicalSlotRequestBulk implements PhysicalSlotRequestBulk {
Review comment:
A bit of `JavaDoc` explaining the purpose of this class could be helpful.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharingPhysicalSlotRequestBulk.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SharingPhysicalSlotRequestBulk implements PhysicalSlotRequestBulk {
+ private final Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>>
executions;
+
+ private final Map<ExecutionSlotSharingGroup, ResourceProfile>
pendingRequests;
+
+ private final Map<ExecutionSlotSharingGroup, AllocationID>
fulfilledRequests;
+
+ private final BiConsumer<ExecutionVertexID, Throwable>
logicalSlotRequestCanceller;
+
+ SharingPhysicalSlotRequestBulk(
+ Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>>
executions,
+ Map<ExecutionSlotSharingGroup, ResourceProfile>
pendingRequests,
+ Map<ExecutionSlotSharingGroup, AllocationID>
fulfilledRequests,
+ BiConsumer<ExecutionVertexID, Throwable>
logicalSlotRequestCanceller) {
+ this.executions = checkNotNull(executions);
+ this.pendingRequests = checkNotNull(pendingRequests);
+ this.fulfilledRequests = checkNotNull(fulfilledRequests);
+ this.logicalSlotRequestCanceller =
checkNotNull(logicalSlotRequestCanceller);
+ }
+
+ @Override
+ public Collection<ResourceProfile> getPendingRequests() {
+ return pendingRequests.values();
+ }
+
+ @Override
+ public Set<AllocationID> getAllocationIdsOfFulfilledRequests() {
+ return new HashSet<>(fulfilledRequests.values());
+ }
+
+ @Override
+ public void cancel(Throwable cause) {
+ // pending requests must be canceled first otherwise they might
be fulfilled by
+ // allocated slots released from this bulk
+ Stream
+ .concat(
+ pendingRequests.keySet().stream(),
+ fulfilledRequests.keySet().stream())
+ .flatMap(group -> executions.get(group).stream())
+ .forEach(id -> logicalSlotRequestCanceller.accept(id,
cause));
Review comment:
Moreover, that way it would also be more explicit to enforce the
contract you have stated in the comment above.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]