tillrohrmann commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r476318798



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -98,14 +101,15 @@ public SchedulerImpl(
                        "Scheduler is not initialized with proper main thread 
executor. " +
                                "Call to Scheduler.start(...) required.");
 
-               this.bulkSlotProvider = new 
BulkSlotProviderImpl(slotSelectionStrategy, slotPool);
+               this.slotRequestBulkChecker = 
PhysicalSlotRequestBulkCheckerImpl.fromSlotPool(slotPool, 
SystemClock.getInstance());
+               this.bulkSlotProvider = new 
BulkSlotProviderImpl(slotSelectionStrategy, slotPool, slotRequestBulkChecker);

Review comment:
       It is a bit unrelated because the change has been introduced before. I 
was wondering why do we have to touch the `SchedulerImpl` at all? Isn't this 
implementation only used by the non-pipelined region scheduler implementations? 
Maybe it would have been easier to separate the scheduler implementations in 
`SchedulerImpl` for the old strategies and a new `PipelinedRegionScheduler` 
implementation which only supports the bulk physical slot requests.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
##########
@@ -87,9 +86,12 @@
                        SlotRequestId slotRequestId,
                        ResourceProfile resourceProfile,
                        boolean willSlotBeOccupiedIndefinitely) {
-               return willSlotBeOccupiedIndefinitely ?
-                       slotPool.requestNewAllocatedSlot(slotRequestId, 
resourceProfile, null) :
-                       slotPool.requestNewAllocatedBatchSlot(slotRequestId, 
resourceProfile);
+               if (willSlotBeOccupiedIndefinitely) {
+                       return slotPool.requestNewAllocatedSlot(slotRequestId, 
resourceProfile, null);
+               } else {
+                       slotPool.disableBatchSlotRequestTimeoutCheck();

Review comment:
       Why can't this happen in the constructor?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
##########
@@ -18,131 +18,14 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.util.clock.Clock;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * This class helps to check the status of physical slot request bulks.
+ * This class tracks a fulfil-ability timeout of a bulk of physical slot 
requests.
+ *
+ * <p>The bulk gets canceled if the timeout occurs and the bulk is not 
fulfillable.

Review comment:
       Nit: different spelling of `fulfill`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulk.java
##########
@@ -20,60 +20,17 @@
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
 
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Set;
 
 /**
  * Represents a bulk of physical slot requests.
  */
-class PhysicalSlotRequestBulk {
+public interface PhysicalSlotRequestBulk {
+       Collection<ResourceProfile> getPendingRequests();
 
-       private final Map<SlotRequestId, ResourceProfile> pendingRequests;
+       Set<AllocationID> getAllocationIdsOfFulfilledRequests();
 
-       private final Map<SlotRequestId, AllocationID> fulfilledRequests = new 
HashMap<>();
-
-       private long unfulfillableTimestamp = Long.MAX_VALUE;
-
-       PhysicalSlotRequestBulk(final Collection<PhysicalSlotRequest> 
physicalSlotRequests) {
-               this.pendingRequests = physicalSlotRequests.stream()
-                       .collect(Collectors.toMap(
-                               PhysicalSlotRequest::getSlotRequestId,
-                               r -> 
r.getSlotProfile().getPhysicalSlotResourceProfile()));
-       }
-
-       void markRequestFulfilled(final SlotRequestId slotRequestId, final 
AllocationID allocationID) {
-               pendingRequests.remove(slotRequestId);
-               fulfilledRequests.put(slotRequestId, allocationID);
-       }
-
-       Map<SlotRequestId, ResourceProfile> getPendingRequests() {
-               return Collections.unmodifiableMap(pendingRequests);
-       }
-
-       Map<SlotRequestId, AllocationID> getFulfilledRequests() {
-               return Collections.unmodifiableMap(fulfilledRequests);
-       }
-
-       void markFulfillable() {
-               unfulfillableTimestamp = Long.MAX_VALUE;
-       }
-
-       void markUnfulfillable(final long currentTimestamp) {
-               if (isFulfillable()) {
-                       unfulfillableTimestamp = currentTimestamp;
-               }
-       }
-
-       long getUnfulfillableSince() {
-               return unfulfillableTimestamp;
-       }
-
-       private boolean isFulfillable() {
-               return unfulfillableTimestamp == Long.MAX_VALUE;
-       }
+       void cancel(Throwable cause);

Review comment:
       Interfaces should always have JavaDocs stating their contracts.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
##########
@@ -18,131 +18,14 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.util.clock.Clock;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * This class helps to check the status of physical slot request bulks.
+ * This class tracks a fulfil-ability timeout of a bulk of physical slot 
requests.
+ *
+ * <p>The bulk gets canceled if the timeout occurs and the bulk is not 
fulfillable.
  */
-class PhysicalSlotRequestBulkChecker {
-
-       private final Supplier<Set<SlotInfo>> slotsRetriever;
-
-       private final Clock clock;
-
-       PhysicalSlotRequestBulkChecker(final Supplier<Set<SlotInfo>> 
slotsRetriever, final Clock clock) {
-               this.slotsRetriever = checkNotNull(slotsRetriever);
-               this.clock = checkNotNull(clock);
-       }
-
-       PhysicalSlotRequestBulk createPhysicalSlotRequestBulk(final 
Collection<PhysicalSlotRequest> physicalSlotRequests) {
-               final PhysicalSlotRequestBulk slotRequestBulk = new 
PhysicalSlotRequestBulk(physicalSlotRequests);
-               slotRequestBulk.markUnfulfillable(clock.relativeTimeMillis());
-
-               return slotRequestBulk;
-       }
-
-       /**
-        * Check the slot request bulk and timeout its requests if it has been 
unfulfillable for too long.
-        * @param slotRequestBulk bulk of slot requests
-        * @param slotRequestTimeout indicates how long a pending request can 
be unfulfillable
-        * @return result of the check, indicating the bulk is fulfilled, still 
pending, or timed out
-        */
-       TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
-                       final PhysicalSlotRequestBulk slotRequestBulk,
-                       final Time slotRequestTimeout) {
-
-               if (slotRequestBulk.getPendingRequests().isEmpty()) {
-                       return TimeoutCheckResult.FULFILLED;
-               }
-
-               final boolean fulfillable = 
isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever);
-               if (fulfillable) {
-                       slotRequestBulk.markFulfillable();
-               } else {
-                       final long currentTimestamp = 
clock.relativeTimeMillis();
-
-                       slotRequestBulk.markUnfulfillable(currentTimestamp);
-
-                       final long unfulfillableSince = 
slotRequestBulk.getUnfulfillableSince();
-                       if (unfulfillableSince + 
slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
-                               return TimeoutCheckResult.TIMEOUT;
-                       }
-               }
-
-               return TimeoutCheckResult.PENDING;
-       }
-
-       /**
-        * Returns whether the given bulk of slot requests are possible to be 
fulfilled at the same time
-        * with all the reusable slots in the slot pool. A reusable slot means 
the slot is available or
-        * will not be occupied indefinitely.
-        *
-        * @param slotRequestBulk bulk of slot requests to check
-        * @param slotsRetriever supplies slots to be used for the 
fulfill-ability check
-        * @return true if the slot requests are possible to be fulfilled, 
otherwise false
-        */
-       @VisibleForTesting
-       static boolean isSlotRequestBulkFulfillable(
-                       final PhysicalSlotRequestBulk slotRequestBulk,
-                       final Supplier<Set<SlotInfo>> slotsRetriever) {
-
-               final Set<AllocationID> assignedSlots = new 
HashSet<>(slotRequestBulk.getFulfilledRequests().values());
-               final Set<SlotInfo> reusableSlots = 
getReusableSlots(slotsRetriever, assignedSlots);
-               return 
areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests().values(), 
reusableSlots);
-       }
-
-       private static Set<SlotInfo> getReusableSlots(
-                       final Supplier<Set<SlotInfo>> slotsRetriever,
-                       final Set<AllocationID> slotsToExclude) {
-
-               return slotsRetriever.get().stream()
-                       .filter(slotInfo -> 
!slotInfo.willBeOccupiedIndefinitely())
-                       .filter(slotInfo -> 
!slotsToExclude.contains(slotInfo.getAllocationId()))
-                       .collect(Collectors.toSet());
-       }
-
-       private static boolean areRequestsFulfillableWithSlots(
-                       final Collection<ResourceProfile> 
requestResourceProfiles,
-                       final Set<SlotInfo> slots) {
-
-               final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
-               for (ResourceProfile requestResourceProfile : 
requestResourceProfiles) {
-                       final Optional<SlotInfo> matchedSlot = 
findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
-                       if (matchedSlot.isPresent()) {
-                               remainingSlots.remove(matchedSlot.get());
-                       } else {
-                               return false;
-                       }
-               }
-               return true;
-       }
-
-       private static Optional<SlotInfo> findMatchingSlotForRequest(
-                       final ResourceProfile requestResourceProfile,
-                       final Collection<SlotInfo> slots) {
-
-               return slots.stream().filter(slot -> 
slot.getResourceProfile().isMatching(requestResourceProfile)).findFirst();
-       }
-
-       enum TimeoutCheckResult {
-               PENDING,
-
-               FULFILLED,
-
-               TIMEOUT
-       }
+@FunctionalInterface
+public interface PhysicalSlotRequestBulkChecker {
+       void schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk 
bulk, Time timeout);

Review comment:
       Interfaces should always have JavaDocs stating their contracts.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to