zhuzhurk commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r474428620



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class PhysicalSlotRequestBulkCheckerImpl implements 
PhysicalSlotRequestBulkChecker {
+
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+       private final Clock clock;
+
+       PhysicalSlotRequestBulkCheckerImpl(final Supplier<Set<SlotInfo>> 
slotsRetriever, final Clock clock) {
+               this.slotsRetriever = checkNotNull(slotsRetriever);
+               this.clock = checkNotNull(clock);
+       }
+
+       void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+               this.componentMainThreadExecutor = mainThreadExecutor;
+       }
+
+       @Override
+       public void 
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time 
timeout) {
+               schedulePendingRequestBulkTimeoutCheck(new 
PhysicalSlotRequestBulkWithTimestamp(bulk), timeout);
+       }
+
+       private void 
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulkWithTimestamp 
bulk, Time timeout) {
+               bulk.markUnfulfillable(clock.relativeTimeMillis());

Review comment:
        If we implement it like this, it turns to be that we require the bulk 
to pass every fulfill-ability check or fails otherwise. This means a request 
can be recognized as timed out if a slot gets lost right before its 
fulfill-ability check even if the bulk has passed the check last time.
   
   How about moving this `markUnfulfillable()` into the above 
`schedulePendingRequestBulkTimeoutCheck ()`.
    

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
##########
@@ -18,131 +18,14 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.util.clock.Clock;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * This class helps to check the status of physical slot request bulks.
+ * This class tracks a fulfil-ability timeout of a bulk of physical slot 
requests.
+ *
+ * <p>The bulk gets canceled if the timeout occurs and the bulk is not 
fulfilled.

Review comment:
       `is not fulfilled` -> `is not fulfillable`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -41,16 +41,12 @@
 class MergingSharedSlotProfileRetrieverFactory implements 
SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
        private final PreferredLocationsRetriever preferredLocationsRetriever;
 
-       private final Function<ExecutionVertexID, ResourceProfile> 
resourceProfileRetriever;
-
        private final Function<ExecutionVertexID, AllocationID> 
priorAllocationIdRetriever;
 
        MergingSharedSlotProfileRetrieverFactory(

Review comment:
       Seems extra indentation of all method params in this class are removed 
unexpectedly.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class PhysicalSlotRequestBulkCheckerImpl implements 
PhysicalSlotRequestBulkChecker {
+
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+       private final Clock clock;
+
+       PhysicalSlotRequestBulkCheckerImpl(final Supplier<Set<SlotInfo>> 
slotsRetriever, final Clock clock) {
+               this.slotsRetriever = checkNotNull(slotsRetriever);
+               this.clock = checkNotNull(clock);
+       }
+
+       void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+               this.componentMainThreadExecutor = mainThreadExecutor;
+       }
+
+       @Override
+       public void 
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time 
timeout) {
+               schedulePendingRequestBulkTimeoutCheck(new 
PhysicalSlotRequestBulkWithTimestamp(bulk), timeout);
+       }
+
+       private void 
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulkWithTimestamp 
bulk, Time timeout) {
+               bulk.markUnfulfillable(clock.relativeTimeMillis());
+               componentMainThreadExecutor.schedule(() -> {
+                       TimeoutCheckResult result = 
checkPhysicalSlotRequestBulkTimeout(bulk, timeout);
+
+                       switch (result) {
+                               case PENDING:
+                                       //re-schedule the timeout check
+                                       
schedulePendingRequestBulkTimeoutCheck(bulk, timeout);
+                                       break;
+                               case TIMEOUT:
+                                       
bulk.getPhysicalSlotRequestBulk().cancel(new TimeoutException("Slot request 
bulk is not fulfillable!"));
+                                       break;
+                               case FULFILLED:
+                               default:
+                                       // no action to take
+                                       break;
+                       }
+               }, timeout.getSize(), timeout.getUnit());
+       }
+
+       /**
+        * Check the slot request bulk and timeout its requests if it has been 
unfulfillable for too long.
+        * @param slotRequestBulk bulk of slot requests
+        * @param slotRequestTimeout indicates how long a pending request can 
be unfulfillable
+        * @return result of the check, indicating the bulk is fulfilled, still 
pending, or timed out
+        */
+       @VisibleForTesting
+       TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
+                       final PhysicalSlotRequestBulkWithTimestamp 
slotRequestBulk,
+                       final Time slotRequestTimeout) {
+
+               if 
(slotRequestBulk.getPhysicalSlotRequestBulk().getPendingRequests().isEmpty()) {
+                       return TimeoutCheckResult.FULFILLED;
+               }
+
+               final boolean fulfillable = 
isSlotRequestBulkFulfillable(slotRequestBulk.getPhysicalSlotRequestBulk(), 
slotsRetriever);
+               if (fulfillable) {
+                       slotRequestBulk.markFulfillable();
+               } else {
+                       final long currentTimestamp = 
clock.relativeTimeMillis();
+
+                       slotRequestBulk.markUnfulfillable(currentTimestamp);
+
+                       final long unfulfillableSince = 
slotRequestBulk.getUnfulfillableSince();
+                       if (unfulfillableSince + 
slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
+                               return TimeoutCheckResult.TIMEOUT;
+                       }
+               }
+
+               return TimeoutCheckResult.PENDING;
+       }
+
+       /**
+        * Returns whether the given bulk of slot requests are possible to be 
fulfilled at the same time
+        * with all the reusable slots in the slot pool. A reusable slot means 
the slot is available or
+        * will not be occupied indefinitely.
+        *
+        * @param slotRequestBulk bulk of slot requests to check
+        * @param slotsRetriever supplies slots to be used for the 
fulfill-ability check
+        * @return true if the slot requests are possible to be fulfilled, 
otherwise false
+        */
+       @VisibleForTesting
+       static boolean isSlotRequestBulkFulfillable(
+                       final PhysicalSlotRequestBulk slotRequestBulk,
+                       final Supplier<Set<SlotInfo>> slotsRetriever) {
+
+               final Set<AllocationID> assignedSlots = 
slotRequestBulk.getAllocationIdsOfFulfilledRequests();
+               final Set<SlotInfo> reusableSlots = 
getReusableSlots(slotsRetriever, assignedSlots);
+               return 
areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests(), 
reusableSlots);
+       }
+
+       private static Set<SlotInfo> getReusableSlots(
+                       final Supplier<Set<SlotInfo>> slotsRetriever,
+                       final Set<AllocationID> slotsToExclude) {
+
+               return slotsRetriever.get().stream()
+                       .filter(slotInfo -> 
!slotInfo.willBeOccupiedIndefinitely())
+                       .filter(slotInfo -> 
!slotsToExclude.contains(slotInfo.getAllocationId()))
+                       .collect(Collectors.toSet());
+       }
+
+       private static boolean areRequestsFulfillableWithSlots(
+                       final Collection<ResourceProfile> 
requestResourceProfiles,
+                       final Set<SlotInfo> slots) {
+
+               final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
+               for (ResourceProfile requestResourceProfile : 
requestResourceProfiles) {
+                       final Optional<SlotInfo> matchedSlot = 
findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
+                       if (matchedSlot.isPresent()) {
+                               remainingSlots.remove(matchedSlot.get());
+                       } else {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       private static Optional<SlotInfo> findMatchingSlotForRequest(
+                       final ResourceProfile requestResourceProfile,
+                       final Collection<SlotInfo> slots) {
+
+               return slots.stream().filter(slot -> 
slot.getResourceProfile().isMatching(requestResourceProfile)).findFirst();
+       }
+
+       static PhysicalSlotRequestBulkCheckerImpl fromSlotPool(final SlotPool 
slotPool, final Clock clock) {
+               return new PhysicalSlotRequestBulkCheckerImpl(() -> 
getAllSlotInfos(slotPool), clock);
+       }
+
+       private static Set<SlotInfo> getAllSlotInfos(SlotPool slotPool) {
+               return Stream
+                       .concat(
+                               
slotPool.getAvailableSlotsInformation().stream(),
+                               
slotPool.getAllocatedSlotsInformation().stream())
+                       .collect(Collectors.toSet());
+       }
+
+       enum TimeoutCheckResult {
+               PENDING,
+
+               FULFILLED,
+
+               TIMEOUT
+       }
+
+       @VisibleForTesting
+       static class PhysicalSlotRequestBulkWithTimestamp {

Review comment:
       How about to let it implement `PhysicalSlotRequestBulk` to simplify its 
usages and hide the internal `PhysicalSlotRequestBulk`? 
   And what about adding a separate class file for it? I can see we have a 
separate test class for it already.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -98,14 +103,16 @@ public SchedulerImpl(
                        "Scheduler is not initialized with proper main thread 
executor. " +
                                "Call to Scheduler.start(...) required.");
 
-               this.bulkSlotProvider = new 
BulkSlotProviderImpl(slotSelectionStrategy, slotPool);
+               this.slotRequestBulkChecker = 
PhysicalSlotRequestBulkCheckerImpl.fromSlotPool(slotPool, 
SystemClock.getInstance());
+               this.bulkSlotProvider = new 
BulkSlotProviderImpl(slotSelectionStrategy, slotPool, slotRequestBulkChecker);
+               this.physicalSlotProvider = new 
PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);

Review comment:
       The `PhysicalSlotProviderImpl` ctor will invoke 
`slotPool.disableBatchSlotRequestTimeoutCheck()` unconditionally.
   Maybe we should move that invocation to 
`PhysicalSlotProviderImpl#requestNewSlot()`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkImpl.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a bulk of physical slot requests.
+ */
+class PhysicalSlotRequestBulkImpl implements PhysicalSlotRequestBulk {

Review comment:
       I think this class can also be removed when we remove `BulkSlotProvider` 
and `OneSlotPerExecutionSlotAllocator`.
   But it is used in tests of `PhysicalSlotRequestBulkCheckerImpl` and 
`PhysicalSlotRequestBulkWithTimestamp`.
   How about adding a separate `TestingPhysicalSlotRequestBulk` for those 
tests? 
   I'm also fine if we do it in a separate task or when we are removing the 
`BulkSlotProvider`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to