tillrohrmann commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r476318798
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -98,14 +101,15 @@ public SchedulerImpl(
"Scheduler is not initialized with proper main thread
executor. " +
"Call to Scheduler.start(...) required.");
- this.bulkSlotProvider = new
BulkSlotProviderImpl(slotSelectionStrategy, slotPool);
+ this.slotRequestBulkChecker =
PhysicalSlotRequestBulkCheckerImpl.fromSlotPool(slotPool,
SystemClock.getInstance());
+ this.bulkSlotProvider = new
BulkSlotProviderImpl(slotSelectionStrategy, slotPool, slotRequestBulkChecker);
Review comment:
It is a bit unrelated because the change has been introduced before. I
was wondering why do we have to touch the `SchedulerImpl` at all? Isn't this
implementation only used by the non-pipelined region scheduler implementations?
Maybe it would have been easier to separate the scheduler implementations in
`SchedulerImpl` for the old strategies and a new `PipelinedRegionScheduler`
implementation which only supports the bulk physical slot requests.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
##########
@@ -87,9 +86,12 @@
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
boolean willSlotBeOccupiedIndefinitely) {
- return willSlotBeOccupiedIndefinitely ?
- slotPool.requestNewAllocatedSlot(slotRequestId,
resourceProfile, null) :
- slotPool.requestNewAllocatedBatchSlot(slotRequestId,
resourceProfile);
+ if (willSlotBeOccupiedIndefinitely) {
+ return slotPool.requestNewAllocatedSlot(slotRequestId,
resourceProfile, null);
+ } else {
+ slotPool.disableBatchSlotRequestTimeoutCheck();
Review comment:
Why can't this happen in the constructor?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
##########
@@ -18,131 +18,14 @@
package org.apache.flink.runtime.jobmaster.slotpool;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.util.clock.Clock;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * This class helps to check the status of physical slot request bulks.
+ * This class tracks a fulfil-ability timeout of a bulk of physical slot
requests.
+ *
+ * <p>The bulk gets canceled if the timeout occurs and the bulk is not
fulfillable.
Review comment:
Nit: different spelling of `fulfill`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulk.java
##########
@@ -20,60 +20,17 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Set;
/**
* Represents a bulk of physical slot requests.
*/
-class PhysicalSlotRequestBulk {
+public interface PhysicalSlotRequestBulk {
+ Collection<ResourceProfile> getPendingRequests();
- private final Map<SlotRequestId, ResourceProfile> pendingRequests;
+ Set<AllocationID> getAllocationIdsOfFulfilledRequests();
- private final Map<SlotRequestId, AllocationID> fulfilledRequests = new
HashMap<>();
-
- private long unfulfillableTimestamp = Long.MAX_VALUE;
-
- PhysicalSlotRequestBulk(final Collection<PhysicalSlotRequest>
physicalSlotRequests) {
- this.pendingRequests = physicalSlotRequests.stream()
- .collect(Collectors.toMap(
- PhysicalSlotRequest::getSlotRequestId,
- r ->
r.getSlotProfile().getPhysicalSlotResourceProfile()));
- }
-
- void markRequestFulfilled(final SlotRequestId slotRequestId, final
AllocationID allocationID) {
- pendingRequests.remove(slotRequestId);
- fulfilledRequests.put(slotRequestId, allocationID);
- }
-
- Map<SlotRequestId, ResourceProfile> getPendingRequests() {
- return Collections.unmodifiableMap(pendingRequests);
- }
-
- Map<SlotRequestId, AllocationID> getFulfilledRequests() {
- return Collections.unmodifiableMap(fulfilledRequests);
- }
-
- void markFulfillable() {
- unfulfillableTimestamp = Long.MAX_VALUE;
- }
-
- void markUnfulfillable(final long currentTimestamp) {
- if (isFulfillable()) {
- unfulfillableTimestamp = currentTimestamp;
- }
- }
-
- long getUnfulfillableSince() {
- return unfulfillableTimestamp;
- }
-
- private boolean isFulfillable() {
- return unfulfillableTimestamp == Long.MAX_VALUE;
- }
+ void cancel(Throwable cause);
Review comment:
Interfaces should always have JavaDocs stating their contracts.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
##########
@@ -18,131 +18,14 @@
package org.apache.flink.runtime.jobmaster.slotpool;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.util.clock.Clock;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * This class helps to check the status of physical slot request bulks.
+ * This class tracks a fulfil-ability timeout of a bulk of physical slot
requests.
+ *
+ * <p>The bulk gets canceled if the timeout occurs and the bulk is not
fulfillable.
*/
-class PhysicalSlotRequestBulkChecker {
-
- private final Supplier<Set<SlotInfo>> slotsRetriever;
-
- private final Clock clock;
-
- PhysicalSlotRequestBulkChecker(final Supplier<Set<SlotInfo>>
slotsRetriever, final Clock clock) {
- this.slotsRetriever = checkNotNull(slotsRetriever);
- this.clock = checkNotNull(clock);
- }
-
- PhysicalSlotRequestBulk createPhysicalSlotRequestBulk(final
Collection<PhysicalSlotRequest> physicalSlotRequests) {
- final PhysicalSlotRequestBulk slotRequestBulk = new
PhysicalSlotRequestBulk(physicalSlotRequests);
- slotRequestBulk.markUnfulfillable(clock.relativeTimeMillis());
-
- return slotRequestBulk;
- }
-
- /**
- * Check the slot request bulk and timeout its requests if it has been
unfulfillable for too long.
- * @param slotRequestBulk bulk of slot requests
- * @param slotRequestTimeout indicates how long a pending request can
be unfulfillable
- * @return result of the check, indicating the bulk is fulfilled, still
pending, or timed out
- */
- TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
- final PhysicalSlotRequestBulk slotRequestBulk,
- final Time slotRequestTimeout) {
-
- if (slotRequestBulk.getPendingRequests().isEmpty()) {
- return TimeoutCheckResult.FULFILLED;
- }
-
- final boolean fulfillable =
isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever);
- if (fulfillable) {
- slotRequestBulk.markFulfillable();
- } else {
- final long currentTimestamp =
clock.relativeTimeMillis();
-
- slotRequestBulk.markUnfulfillable(currentTimestamp);
-
- final long unfulfillableSince =
slotRequestBulk.getUnfulfillableSince();
- if (unfulfillableSince +
slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
- return TimeoutCheckResult.TIMEOUT;
- }
- }
-
- return TimeoutCheckResult.PENDING;
- }
-
- /**
- * Returns whether the given bulk of slot requests are possible to be
fulfilled at the same time
- * with all the reusable slots in the slot pool. A reusable slot means
the slot is available or
- * will not be occupied indefinitely.
- *
- * @param slotRequestBulk bulk of slot requests to check
- * @param slotsRetriever supplies slots to be used for the
fulfill-ability check
- * @return true if the slot requests are possible to be fulfilled,
otherwise false
- */
- @VisibleForTesting
- static boolean isSlotRequestBulkFulfillable(
- final PhysicalSlotRequestBulk slotRequestBulk,
- final Supplier<Set<SlotInfo>> slotsRetriever) {
-
- final Set<AllocationID> assignedSlots = new
HashSet<>(slotRequestBulk.getFulfilledRequests().values());
- final Set<SlotInfo> reusableSlots =
getReusableSlots(slotsRetriever, assignedSlots);
- return
areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests().values(),
reusableSlots);
- }
-
- private static Set<SlotInfo> getReusableSlots(
- final Supplier<Set<SlotInfo>> slotsRetriever,
- final Set<AllocationID> slotsToExclude) {
-
- return slotsRetriever.get().stream()
- .filter(slotInfo ->
!slotInfo.willBeOccupiedIndefinitely())
- .filter(slotInfo ->
!slotsToExclude.contains(slotInfo.getAllocationId()))
- .collect(Collectors.toSet());
- }
-
- private static boolean areRequestsFulfillableWithSlots(
- final Collection<ResourceProfile>
requestResourceProfiles,
- final Set<SlotInfo> slots) {
-
- final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
- for (ResourceProfile requestResourceProfile :
requestResourceProfiles) {
- final Optional<SlotInfo> matchedSlot =
findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
- if (matchedSlot.isPresent()) {
- remainingSlots.remove(matchedSlot.get());
- } else {
- return false;
- }
- }
- return true;
- }
-
- private static Optional<SlotInfo> findMatchingSlotForRequest(
- final ResourceProfile requestResourceProfile,
- final Collection<SlotInfo> slots) {
-
- return slots.stream().filter(slot ->
slot.getResourceProfile().isMatching(requestResourceProfile)).findFirst();
- }
-
- enum TimeoutCheckResult {
- PENDING,
-
- FULFILLED,
-
- TIMEOUT
- }
+@FunctionalInterface
+public interface PhysicalSlotRequestBulkChecker {
+ void schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk
bulk, Time timeout);
Review comment:
Interfaces should always have JavaDocs stating their contracts.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]