azagrebin commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r481278689



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class PhysicalSlotRequestBulkCheckerImpl implements 
PhysicalSlotRequestBulkChecker {
+
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+       private final Clock clock;
+
+       PhysicalSlotRequestBulkCheckerImpl(final Supplier<Set<SlotInfo>> 
slotsRetriever, final Clock clock) {
+               this.slotsRetriever = checkNotNull(slotsRetriever);
+               this.clock = checkNotNull(clock);
+       }
+
+       void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+               this.componentMainThreadExecutor = mainThreadExecutor;
+       }
+
+       @Override
+       public void 
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time 
timeout) {
+               PhysicalSlotRequestBulkWithTimestamp bulkWithTimestamp = new 
PhysicalSlotRequestBulkWithTimestamp(bulk);
+               bulkWithTimestamp.markUnfulfillable(clock.relativeTimeMillis());
+               schedulePendingRequestBulkTimeoutCheck(bulkWithTimestamp, 
timeout);
+       }
+
+       private void 
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulkWithTimestamp 
bulk, Time timeout) {
+               componentMainThreadExecutor.schedule(() -> {
+                       TimeoutCheckResult result = 
checkPhysicalSlotRequestBulkTimeout(bulk, timeout);
+
+                       switch (result) {
+                               case PENDING:
+                                       //re-schedule the timeout check
+                                       
schedulePendingRequestBulkTimeoutCheck(bulk, timeout);
+                                       break;
+                               case TIMEOUT:
+                                       bulk.cancel(new TimeoutException("Slot 
request bulk is not fulfillable!"));
+                                       break;
+                               case FULFILLED:
+                               default:
+                                       // no action to take
+                                       break;
+                       }
+               }, timeout.getSize(), timeout.getUnit());
+       }
+
+       /**
+        * Check the slot request bulk and timeout its requests if it has been 
unfulfillable for too long.
+        * @param slotRequestBulk bulk of slot requests
+        * @param slotRequestTimeout indicates how long a pending request can 
be unfulfillable
+        * @return result of the check, indicating the bulk is fulfilled, still 
pending, or timed out
+        */
+       @VisibleForTesting
+       TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
+                       final PhysicalSlotRequestBulkWithTimestamp 
slotRequestBulk,
+                       final Time slotRequestTimeout) {
+
+               if (slotRequestBulk.getPendingRequests().isEmpty()) {
+                       return TimeoutCheckResult.FULFILLED;
+               }
+
+               final boolean fulfillable = 
isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever);
+               if (fulfillable) {
+                       slotRequestBulk.markFulfillable();
+               } else {
+                       final long currentTimestamp = 
clock.relativeTimeMillis();
+
+                       slotRequestBulk.markUnfulfillable(currentTimestamp);
+
+                       final long unfulfillableSince = 
slotRequestBulk.getUnfulfillableSince();
+                       if (unfulfillableSince + 
slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
+                               return TimeoutCheckResult.TIMEOUT;
+                       }
+               }
+
+               return TimeoutCheckResult.PENDING;
+       }
+
+       /**
+        * Returns whether the given bulk of slot requests are possible to be 
fulfilled at the same time
+        * with all the reusable slots in the slot pool. A reusable slot means 
the slot is available or
+        * will not be occupied indefinitely.
+        *
+        * @param slotRequestBulk bulk of slot requests to check
+        * @param slotsRetriever supplies slots to be used for the 
fulfill-ability check
+        * @return true if the slot requests are possible to be fulfilled, 
otherwise false
+        */
+       @VisibleForTesting
+       static boolean isSlotRequestBulkFulfillable(
+                       final PhysicalSlotRequestBulk slotRequestBulk,
+                       final Supplier<Set<SlotInfo>> slotsRetriever) {
+
+               final Set<AllocationID> assignedSlots = 
slotRequestBulk.getAllocationIdsOfFulfilledRequests();
+               final Set<SlotInfo> reusableSlots = 
getReusableSlots(slotsRetriever, assignedSlots);
+               return 
areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests(), 
reusableSlots);
+       }
+
+       private static Set<SlotInfo> getReusableSlots(
+                       final Supplier<Set<SlotInfo>> slotsRetriever,
+                       final Set<AllocationID> slotsToExclude) {
+
+               return slotsRetriever.get().stream()
+                       .filter(slotInfo -> 
!slotInfo.willBeOccupiedIndefinitely())
+                       .filter(slotInfo -> 
!slotsToExclude.contains(slotInfo.getAllocationId()))
+                       .collect(Collectors.toSet());
+       }
+
+       private static boolean areRequestsFulfillableWithSlots(
+                       final Collection<ResourceProfile> 
requestResourceProfiles,
+                       final Set<SlotInfo> slots) {
+
+               final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
+               for (ResourceProfile requestResourceProfile : 
requestResourceProfiles) {
+                       final Optional<SlotInfo> matchedSlot = 
findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
+                       if (matchedSlot.isPresent()) {
+                               remainingSlots.remove(matchedSlot.get());
+                       } else {
+                               return false;
+                       }
+               }

Review comment:
       yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to