tillrohrmann commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r480001862



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java
##########
@@ -169,44 +146,12 @@ public void cancelSlotRequest(SlotRequestId 
slotRequestId, Throwable cause) {
                }
        }
 
-       private void schedulePendingRequestBulkTimeoutCheck(
-                       final PhysicalSlotRequestBulk slotRequestBulk,
-                       final Time timeout) {
-
-               componentMainThreadExecutor.schedule(() -> {
-                       final PhysicalSlotRequestBulkChecker.TimeoutCheckResult 
result =
-                               
slotRequestBulkChecker.checkPhysicalSlotRequestBulkTimeout(slotRequestBulk, 
timeout);
-
-                       switch (result) {
-                               case PENDING:
-                                       //re-schedule the timeout check
-                                       
schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, timeout);
-                                       break;
-                               case TIMEOUT:
-                                       timeoutSlotRequestBulk(slotRequestBulk);
-                                       break;
-                               default: // no action to take
-                       }
-               }, timeout.getSize(), timeout.getUnit());
-       }
-
-       private void timeoutSlotRequestBulk(final PhysicalSlotRequestBulk 
slotRequestBulk) {
-               final Exception cause = new TimeoutException("Slot request bulk 
is not fulfillable!");
-               // pending requests must be canceled first otherwise they might 
be fulfilled by
-               // allocated slots released from this bulk
-               for (SlotRequestId slotRequestId : 
slotRequestBulk.getPendingRequests().keySet()) {
-                       cancelSlotRequest(slotRequestId, cause);
-               }
-               for (SlotRequestId slotRequestId : 
slotRequestBulk.getFulfilledRequests().keySet()) {
-                       cancelSlotRequest(slotRequestId, cause);
-               }
-       }
-
-       private Set<SlotInfo> getAllSlotInfos() {
-               return Stream
-                       .concat(
-                               
slotPool.getAvailableSlotsInformation().stream(),
-                               
slotPool.getAllocatedSlotsInformation().stream())
-                       .collect(Collectors.toSet());
+       private PhysicalSlotRequestBulkImpl createPhysicalSlotRequestBulk(final 
Collection<PhysicalSlotRequest> physicalSlotRequests) {
+               final PhysicalSlotRequestBulkImpl slotRequestBulk = new 
PhysicalSlotRequestBulkImpl(physicalSlotRequests
+                       .stream()
+                       .collect(Collectors.toMap(
+                               PhysicalSlotRequest::getSlotRequestId,
+                               r -> 
r.getSlotProfile().getPhysicalSlotResourceProfile())), this::cancelSlotRequest);

Review comment:
       nit the formatting is a bit off here
   
   ```suggestion
                final PhysicalSlotRequestBulkImpl slotRequestBulk = new 
PhysicalSlotRequestBulkImpl(
                    physicalSlotRequests
                            .stream()
                            .collect(Collectors.toMap(
                                    PhysicalSlotRequest::getSlotRequestId,
                                    r -> 
r.getSlotProfile().getPhysicalSlotResourceProfile())), 
                    this::cancelSlotRequest);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BulkSlotProviderImpl.java
##########
@@ -49,50 +43,33 @@
 
        private static final Logger LOG = 
LoggerFactory.getLogger(BulkSlotProviderImpl.class);
 
-       private ComponentMainThreadExecutor componentMainThreadExecutor;
-
        private final SlotSelectionStrategy slotSelectionStrategy;
 
        private final SlotPool slotPool;
 
        private final PhysicalSlotRequestBulkChecker slotRequestBulkChecker;
 
-       BulkSlotProviderImpl(final SlotSelectionStrategy slotSelectionStrategy, 
final SlotPool slotPool) {
+       BulkSlotProviderImpl(
+                       final SlotSelectionStrategy slotSelectionStrategy,
+                       final SlotPool slotPool,
+                       final PhysicalSlotRequestBulkChecker 
slotRequestBulkChecker) {
                this.slotSelectionStrategy = 
checkNotNull(slotSelectionStrategy);
                this.slotPool = checkNotNull(slotPool);
-
-               this.slotRequestBulkChecker = new 
PhysicalSlotRequestBulkChecker(
-                       this::getAllSlotInfos,
-                       SystemClock.getInstance());
-
-               this.componentMainThreadExecutor = new 
ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
-                       "Scheduler is not initialized with proper main thread 
executor. " +
-                               "Call to BulkSlotProvider.start(...) 
required.");
-       }
-
-       @Override
-       public void start(final ComponentMainThreadExecutor mainThreadExecutor) 
{
-               this.componentMainThreadExecutor = mainThreadExecutor;
+               this.slotRequestBulkChecker = slotRequestBulkChecker;

Review comment:
       the null checks are not consistent in this class.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
##########
@@ -18,131 +18,24 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.util.clock.Clock;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * This class helps to check the status of physical slot request bulks.
+ * This class tracks a fulfil-ability timeout of a bulk of physical slot 
requests.
+ *
+ * <p>The check stops when all physical slot requests of the bulk are 
fulfilled by available or newly allocated slots.
+ * The bulk is fulfillable if all its physical slot requests can be fulfilled 
either by available or
+ * newly allocated slots or slots which currently used by other job subtasks.
+ * The bulk gets canceled if the timeout occurs and the bulk is not 
fulfillable.
+ * The timeout does not tick while the bulk is fulfillable but not fulfilled 
yet.

Review comment:
       tick -> trigger?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -161,158 +173,66 @@ private SharedSlot getOrAllocateSharedSlot(
                                        .thenCompose(slotProfile -> 
slotProvider.allocatePhysicalSlot(
                                                new 
PhysicalSlotRequest(physicalSlotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely)))
                                        
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
-                               return new SharedSlot(physicalSlotRequestId, 
physicalSlotResourceProfile, group, physicalSlotFuture);
+                               return new SharedSlot(
+                                       physicalSlotRequestId,
+                                       physicalSlotResourceProfile,
+                                       group,
+                                       physicalSlotFuture,
+                                       slotWillBeOccupiedIndefinitely,
+                                       this::releaseSharedSlot);
                        });
        }
 
+       private void releaseSharedSlot(ExecutionSlotSharingGroup 
executionSlotSharingGroup) {
+               SharedSlot slot = sharedSlots.remove(executionSlotSharingGroup);
+               if (slot != null) {
+                       slotProvider.cancelSlotRequest(
+                               slot.getPhysicalSlotRequestId(),
+                               new FlinkException("Slot is being returned from 
SlotSharingExecutionSlotAllocator."));
+               } else {
+                       LOG.debug("There is no slot for 
ExecutionSlotSharingGroup {} to release", executionSlotSharingGroup);
+               }
+       }
+
        private ResourceProfile 
getPhysicalSlotResourceProfile(ExecutionSlotSharingGroup 
executionSlotSharingGroup) {
                return executionSlotSharingGroup
                        .getExecutionVertexIds()
                        .stream()
                        .reduce(ResourceProfile.ZERO, (r, e) -> 
r.merge(resourceProfileRetriever.apply(e)), ResourceProfile::merge);
        }
 
-       private class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
-               private final SlotRequestId physicalSlotRequestId;
-
-               private final ResourceProfile physicalSlotResourceProfile;
-
-               private final ExecutionSlotSharingGroup 
executionSlotSharingGroup;
-
-               private final CompletableFuture<PhysicalSlot> slotContextFuture;
-
-               private final DualKeyLinkedMap<ExecutionVertexID, 
SlotRequestId, CompletableFuture<SingleLogicalSlot>> requestedLogicalSlots;
-
-               private SharedSlot(
-                               SlotRequestId physicalSlotRequestId,
-                               ResourceProfile physicalSlotResourceProfile,
-                               ExecutionSlotSharingGroup 
executionSlotSharingGroup,
-                               CompletableFuture<PhysicalSlot> 
slotContextFuture) {
-                       this.physicalSlotRequestId = physicalSlotRequestId;
-                       this.physicalSlotResourceProfile = 
physicalSlotResourceProfile;
-                       this.executionSlotSharingGroup = 
executionSlotSharingGroup;
-                       this.slotContextFuture = 
slotContextFuture.thenApply(physicalSlot -> {
-                               Preconditions.checkState(
-                                       physicalSlot.tryAssignPayload(this),
-                                       "Unexpected physical slot payload 
assignment failure!");
-                               return physicalSlot;
+       private SharingPhysicalSlotRequestBulk 
createBulk(Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) {
+               Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests 
= executions
+                       .keySet()
+                       .stream()
+                       .collect(Collectors.toMap(
+                               group -> group,
+                               group -> 
sharedSlots.get(group).getPhysicalSlotResourceProfile()
+                       ));
+               Map<ExecutionSlotSharingGroup, AllocationID> fulfilledRequests 
= new HashMap<>();
+               SharingPhysicalSlotRequestBulk bulk = new 
SharingPhysicalSlotRequestBulk(
+                       executions,
+                       pendingRequests,
+                       fulfilledRequests,
+                       (executionVertexId, cause) -> {
+                               ExecutionSlotSharingGroup group = 
slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexId);
+                               
sharedSlots.get(group).cancelLogicalSlotRequest(executionVertexId, cause);

Review comment:
       Here I would suggest to add a check state that `sharedSlots.get(group)` 
actually contains an item.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class PhysicalSlotRequestBulkCheckerImpl implements 
PhysicalSlotRequestBulkChecker {
+
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+       private final Clock clock;
+
+       PhysicalSlotRequestBulkCheckerImpl(final Supplier<Set<SlotInfo>> 
slotsRetriever, final Clock clock) {
+               this.slotsRetriever = checkNotNull(slotsRetriever);
+               this.clock = checkNotNull(clock);
+       }
+
+       void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+               this.componentMainThreadExecutor = mainThreadExecutor;
+       }
+
+       @Override
+       public void 
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time 
timeout) {
+               PhysicalSlotRequestBulkWithTimestamp bulkWithTimestamp = new 
PhysicalSlotRequestBulkWithTimestamp(bulk);
+               bulkWithTimestamp.markUnfulfillable(clock.relativeTimeMillis());
+               schedulePendingRequestBulkTimeoutCheck(bulkWithTimestamp, 
timeout);
+       }
+
+       private void 
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulkWithTimestamp 
bulk, Time timeout) {
+               componentMainThreadExecutor.schedule(() -> {
+                       TimeoutCheckResult result = 
checkPhysicalSlotRequestBulkTimeout(bulk, timeout);
+
+                       switch (result) {
+                               case PENDING:
+                                       //re-schedule the timeout check
+                                       
schedulePendingRequestBulkTimeoutCheck(bulk, timeout);
+                                       break;
+                               case TIMEOUT:
+                                       bulk.cancel(new TimeoutException("Slot 
request bulk is not fulfillable!"));
+                                       break;
+                               case FULFILLED:
+                               default:
+                                       // no action to take
+                                       break;
+                       }
+               }, timeout.getSize(), timeout.getUnit());
+       }
+
+       /**
+        * Check the slot request bulk and timeout its requests if it has been 
unfulfillable for too long.
+        * @param slotRequestBulk bulk of slot requests
+        * @param slotRequestTimeout indicates how long a pending request can 
be unfulfillable
+        * @return result of the check, indicating the bulk is fulfilled, still 
pending, or timed out
+        */
+       @VisibleForTesting
+       TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
+                       final PhysicalSlotRequestBulkWithTimestamp 
slotRequestBulk,
+                       final Time slotRequestTimeout) {
+
+               if (slotRequestBulk.getPendingRequests().isEmpty()) {
+                       return TimeoutCheckResult.FULFILLED;
+               }
+
+               final boolean fulfillable = 
isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever);
+               if (fulfillable) {
+                       slotRequestBulk.markFulfillable();
+               } else {
+                       final long currentTimestamp = 
clock.relativeTimeMillis();
+
+                       slotRequestBulk.markUnfulfillable(currentTimestamp);
+
+                       final long unfulfillableSince = 
slotRequestBulk.getUnfulfillableSince();
+                       if (unfulfillableSince + 
slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
+                               return TimeoutCheckResult.TIMEOUT;
+                       }
+               }
+
+               return TimeoutCheckResult.PENDING;
+       }
+
+       /**
+        * Returns whether the given bulk of slot requests are possible to be 
fulfilled at the same time
+        * with all the reusable slots in the slot pool. A reusable slot means 
the slot is available or
+        * will not be occupied indefinitely.
+        *
+        * @param slotRequestBulk bulk of slot requests to check
+        * @param slotsRetriever supplies slots to be used for the 
fulfill-ability check
+        * @return true if the slot requests are possible to be fulfilled, 
otherwise false
+        */
+       @VisibleForTesting
+       static boolean isSlotRequestBulkFulfillable(
+                       final PhysicalSlotRequestBulk slotRequestBulk,
+                       final Supplier<Set<SlotInfo>> slotsRetriever) {
+
+               final Set<AllocationID> assignedSlots = 
slotRequestBulk.getAllocationIdsOfFulfilledRequests();
+               final Set<SlotInfo> reusableSlots = 
getReusableSlots(slotsRetriever, assignedSlots);
+               return 
areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests(), 
reusableSlots);
+       }
+
+       private static Set<SlotInfo> getReusableSlots(
+                       final Supplier<Set<SlotInfo>> slotsRetriever,
+                       final Set<AllocationID> slotsToExclude) {
+
+               return slotsRetriever.get().stream()
+                       .filter(slotInfo -> 
!slotInfo.willBeOccupiedIndefinitely())
+                       .filter(slotInfo -> 
!slotsToExclude.contains(slotInfo.getAllocationId()))
+                       .collect(Collectors.toSet());
+       }
+
+       private static boolean areRequestsFulfillableWithSlots(
+                       final Collection<ResourceProfile> 
requestResourceProfiles,
+                       final Set<SlotInfo> slots) {
+
+               final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
+               for (ResourceProfile requestResourceProfile : 
requestResourceProfiles) {
+                       final Optional<SlotInfo> matchedSlot = 
findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
+                       if (matchedSlot.isPresent()) {
+                               remainingSlots.remove(matchedSlot.get());
+                       } else {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       private static Optional<SlotInfo> findMatchingSlotForRequest(
+                       final ResourceProfile requestResourceProfile,
+                       final Collection<SlotInfo> slots) {
+
+               return slots.stream().filter(slot -> 
slot.getResourceProfile().isMatching(requestResourceProfile)).findFirst();
+       }
+
+       static PhysicalSlotRequestBulkCheckerImpl fromSlotPool(final SlotPool 
slotPool, final Clock clock) {
+               return new PhysicalSlotRequestBulkCheckerImpl(() -> 
getAllSlotInfos(slotPool), clock);
+       }
+
+       private static Set<SlotInfo> getAllSlotInfos(SlotPool slotPool) {
+               return Stream
+                       .concat(
+                               
slotPool.getAvailableSlotsInformation().stream(),
+                               
slotPool.getAllocatedSlotsInformation().stream())
+                       .collect(Collectors.toSet());

Review comment:
       I think `slotPool.getAllocatesSlotsInformation` should already give you 
all slot infos.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharingPhysicalSlotRequestBulk.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SharingPhysicalSlotRequestBulk implements PhysicalSlotRequestBulk {
+       private final Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> 
executions;
+
+       private final Map<ExecutionSlotSharingGroup, ResourceProfile> 
pendingRequests;
+
+       private final Map<ExecutionSlotSharingGroup, AllocationID> 
fulfilledRequests;
+
+       private final BiConsumer<ExecutionVertexID, Throwable> 
logicalSlotRequestCanceller;
+
+       SharingPhysicalSlotRequestBulk(
+                       Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> 
executions,
+                       Map<ExecutionSlotSharingGroup, ResourceProfile> 
pendingRequests,
+                       Map<ExecutionSlotSharingGroup, AllocationID> 
fulfilledRequests,
+                       BiConsumer<ExecutionVertexID, Throwable> 
logicalSlotRequestCanceller) {
+               this.executions = checkNotNull(executions);
+               this.pendingRequests = checkNotNull(pendingRequests);
+               this.fulfilledRequests = checkNotNull(fulfilledRequests);
+               this.logicalSlotRequestCanceller = 
checkNotNull(logicalSlotRequestCanceller);
+       }
+
+       @Override
+       public Collection<ResourceProfile> getPendingRequests() {
+               return pendingRequests.values();
+       }
+
+       @Override
+       public Set<AllocationID> getAllocationIdsOfFulfilledRequests() {
+               return new HashSet<>(fulfilledRequests.values());
+       }
+
+       @Override
+       public void cancel(Throwable cause) {
+               // pending requests must be canceled first otherwise they might 
be fulfilled by
+               // allocated slots released from this bulk
+               Stream
+                       .concat(
+                               pendingRequests.keySet().stream(),
+                               fulfilledRequests.keySet().stream())
+                       .flatMap(group -> executions.get(group).stream())
+                       .forEach(id -> logicalSlotRequestCanceller.accept(id, 
cause));

Review comment:
       Call me old fashioned but I like to follow the KISS principle and to use 
the for-each loop here instead of wrapping everything in a stream and then 
calling a non-pure function in `forEach`. I think the streams API has its value 
in transforming streams via pure functions into something else but should not 
be used when doing computations with side effects.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -113,27 +122,30 @@
 
                SharedSlotProfileRetriever sharedSlotProfileRetriever = 
sharedSlotProfileRetrieverFactory
                        .createFromBulk(new HashSet<>(executionVertexIds));
-               Map<ExecutionVertexID, SlotExecutionVertexAssignment> 
assignments = executionVertexIds
+               Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> 
executionsByGroup = executionVertexIds
                        .stream()
-                       
.collect(Collectors.groupingBy(slotSharingStrategy::getExecutionSlotSharingGroup))
+                       
.collect(Collectors.groupingBy(slotSharingStrategy::getExecutionSlotSharingGroup));
+               Map<ExecutionVertexID, SlotExecutionVertexAssignment> 
assignments = executionsByGroup
                        .entrySet()
                        .stream()
                        .flatMap(entry -> 
allocateLogicalSlotsFromSharedSlot(sharedSlotProfileRetriever, entry.getKey(), 
entry.getValue()))
                        
.collect(Collectors.toMap(SlotExecutionVertexAssignment::getExecutionVertexId, 
a -> a));

Review comment:
       I am not convinced that using the stream API here is the right choice. 
What we are doing is to do computations with side effects. For that I would 
always prefer a traditional for-each loop.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkChecker.java
##########
@@ -18,131 +18,24 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmaster.SlotInfo;
-import org.apache.flink.util.clock.Clock;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * This class helps to check the status of physical slot request bulks.
+ * This class tracks a fulfil-ability timeout of a bulk of physical slot 
requests.

Review comment:
       Nit: I would suggest to decide whether to write `fulfil` or `fulfill` 
and to be consistent with spelling.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class PhysicalSlotRequestBulkCheckerImpl implements 
PhysicalSlotRequestBulkChecker {
+
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+       private final Clock clock;
+
+       PhysicalSlotRequestBulkCheckerImpl(final Supplier<Set<SlotInfo>> 
slotsRetriever, final Clock clock) {
+               this.slotsRetriever = checkNotNull(slotsRetriever);
+               this.clock = checkNotNull(clock);
+       }
+
+       void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+               this.componentMainThreadExecutor = mainThreadExecutor;
+       }
+
+       @Override
+       public void 
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time 
timeout) {
+               PhysicalSlotRequestBulkWithTimestamp bulkWithTimestamp = new 
PhysicalSlotRequestBulkWithTimestamp(bulk);
+               bulkWithTimestamp.markUnfulfillable(clock.relativeTimeMillis());
+               schedulePendingRequestBulkTimeoutCheck(bulkWithTimestamp, 
timeout);
+       }
+
+       private void 
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulkWithTimestamp 
bulk, Time timeout) {
+               componentMainThreadExecutor.schedule(() -> {
+                       TimeoutCheckResult result = 
checkPhysicalSlotRequestBulkTimeout(bulk, timeout);
+
+                       switch (result) {
+                               case PENDING:
+                                       //re-schedule the timeout check
+                                       
schedulePendingRequestBulkTimeoutCheck(bulk, timeout);
+                                       break;
+                               case TIMEOUT:
+                                       bulk.cancel(new TimeoutException("Slot 
request bulk is not fulfillable!"));
+                                       break;
+                               case FULFILLED:
+                               default:
+                                       // no action to take
+                                       break;
+                       }
+               }, timeout.getSize(), timeout.getUnit());
+       }
+
+       /**
+        * Check the slot request bulk and timeout its requests if it has been 
unfulfillable for too long.
+        * @param slotRequestBulk bulk of slot requests
+        * @param slotRequestTimeout indicates how long a pending request can 
be unfulfillable
+        * @return result of the check, indicating the bulk is fulfilled, still 
pending, or timed out
+        */
+       @VisibleForTesting
+       TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
+                       final PhysicalSlotRequestBulkWithTimestamp 
slotRequestBulk,
+                       final Time slotRequestTimeout) {
+
+               if (slotRequestBulk.getPendingRequests().isEmpty()) {
+                       return TimeoutCheckResult.FULFILLED;
+               }
+
+               final boolean fulfillable = 
isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever);
+               if (fulfillable) {
+                       slotRequestBulk.markFulfillable();
+               } else {
+                       final long currentTimestamp = 
clock.relativeTimeMillis();
+
+                       slotRequestBulk.markUnfulfillable(currentTimestamp);
+
+                       final long unfulfillableSince = 
slotRequestBulk.getUnfulfillableSince();
+                       if (unfulfillableSince + 
slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
+                               return TimeoutCheckResult.TIMEOUT;
+                       }
+               }
+
+               return TimeoutCheckResult.PENDING;
+       }
+
+       /**
+        * Returns whether the given bulk of slot requests are possible to be 
fulfilled at the same time
+        * with all the reusable slots in the slot pool. A reusable slot means 
the slot is available or
+        * will not be occupied indefinitely.
+        *
+        * @param slotRequestBulk bulk of slot requests to check
+        * @param slotsRetriever supplies slots to be used for the 
fulfill-ability check
+        * @return true if the slot requests are possible to be fulfilled, 
otherwise false
+        */
+       @VisibleForTesting
+       static boolean isSlotRequestBulkFulfillable(
+                       final PhysicalSlotRequestBulk slotRequestBulk,
+                       final Supplier<Set<SlotInfo>> slotsRetriever) {
+
+               final Set<AllocationID> assignedSlots = 
slotRequestBulk.getAllocationIdsOfFulfilledRequests();
+               final Set<SlotInfo> reusableSlots = 
getReusableSlots(slotsRetriever, assignedSlots);
+               return 
areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests(), 
reusableSlots);
+       }
+
+       private static Set<SlotInfo> getReusableSlots(
+                       final Supplier<Set<SlotInfo>> slotsRetriever,
+                       final Set<AllocationID> slotsToExclude) {
+
+               return slotsRetriever.get().stream()
+                       .filter(slotInfo -> 
!slotInfo.willBeOccupiedIndefinitely())
+                       .filter(slotInfo -> 
!slotsToExclude.contains(slotInfo.getAllocationId()))
+                       .collect(Collectors.toSet());
+       }
+
+       private static boolean areRequestsFulfillableWithSlots(
+                       final Collection<ResourceProfile> 
requestResourceProfiles,
+                       final Set<SlotInfo> slots) {
+
+               final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
+               for (ResourceProfile requestResourceProfile : 
requestResourceProfiles) {
+                       final Optional<SlotInfo> matchedSlot = 
findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
+                       if (matchedSlot.isPresent()) {
+                               remainingSlots.remove(matchedSlot.get());
+                       } else {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       private static Optional<SlotInfo> findMatchingSlotForRequest(
+                       final ResourceProfile requestResourceProfile,
+                       final Collection<SlotInfo> slots) {
+
+               return slots.stream().filter(slot -> 
slot.getResourceProfile().isMatching(requestResourceProfile)).findFirst();
+       }
+
+       static PhysicalSlotRequestBulkCheckerImpl fromSlotPool(final SlotPool 
slotPool, final Clock clock) {

Review comment:
       ```suggestion
        static PhysicalSlotRequestBulkCheckerImpl create(final SlotPool 
slotPool, final Clock clock) {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class PhysicalSlotRequestBulkCheckerImpl implements 
PhysicalSlotRequestBulkChecker {
+
+       private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+       private final Supplier<Set<SlotInfo>> slotsRetriever;
+
+       private final Clock clock;
+
+       PhysicalSlotRequestBulkCheckerImpl(final Supplier<Set<SlotInfo>> 
slotsRetriever, final Clock clock) {
+               this.slotsRetriever = checkNotNull(slotsRetriever);
+               this.clock = checkNotNull(clock);
+       }
+
+       void start(final ComponentMainThreadExecutor mainThreadExecutor) {
+               this.componentMainThreadExecutor = mainThreadExecutor;
+       }
+
+       @Override
+       public void 
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time 
timeout) {
+               PhysicalSlotRequestBulkWithTimestamp bulkWithTimestamp = new 
PhysicalSlotRequestBulkWithTimestamp(bulk);
+               bulkWithTimestamp.markUnfulfillable(clock.relativeTimeMillis());
+               schedulePendingRequestBulkTimeoutCheck(bulkWithTimestamp, 
timeout);
+       }
+
+       private void 
schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulkWithTimestamp 
bulk, Time timeout) {
+               componentMainThreadExecutor.schedule(() -> {
+                       TimeoutCheckResult result = 
checkPhysicalSlotRequestBulkTimeout(bulk, timeout);
+
+                       switch (result) {
+                               case PENDING:
+                                       //re-schedule the timeout check
+                                       
schedulePendingRequestBulkTimeoutCheck(bulk, timeout);
+                                       break;
+                               case TIMEOUT:
+                                       bulk.cancel(new TimeoutException("Slot 
request bulk is not fulfillable!"));
+                                       break;
+                               case FULFILLED:
+                               default:
+                                       // no action to take
+                                       break;
+                       }
+               }, timeout.getSize(), timeout.getUnit());
+       }
+
+       /**
+        * Check the slot request bulk and timeout its requests if it has been 
unfulfillable for too long.
+        * @param slotRequestBulk bulk of slot requests
+        * @param slotRequestTimeout indicates how long a pending request can 
be unfulfillable
+        * @return result of the check, indicating the bulk is fulfilled, still 
pending, or timed out
+        */
+       @VisibleForTesting
+       TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(
+                       final PhysicalSlotRequestBulkWithTimestamp 
slotRequestBulk,
+                       final Time slotRequestTimeout) {
+
+               if (slotRequestBulk.getPendingRequests().isEmpty()) {
+                       return TimeoutCheckResult.FULFILLED;
+               }
+
+               final boolean fulfillable = 
isSlotRequestBulkFulfillable(slotRequestBulk, slotsRetriever);
+               if (fulfillable) {
+                       slotRequestBulk.markFulfillable();
+               } else {
+                       final long currentTimestamp = 
clock.relativeTimeMillis();
+
+                       slotRequestBulk.markUnfulfillable(currentTimestamp);
+
+                       final long unfulfillableSince = 
slotRequestBulk.getUnfulfillableSince();
+                       if (unfulfillableSince + 
slotRequestTimeout.toMilliseconds() <= currentTimestamp) {
+                               return TimeoutCheckResult.TIMEOUT;
+                       }
+               }
+
+               return TimeoutCheckResult.PENDING;
+       }
+
+       /**
+        * Returns whether the given bulk of slot requests are possible to be 
fulfilled at the same time
+        * with all the reusable slots in the slot pool. A reusable slot means 
the slot is available or
+        * will not be occupied indefinitely.
+        *
+        * @param slotRequestBulk bulk of slot requests to check
+        * @param slotsRetriever supplies slots to be used for the 
fulfill-ability check
+        * @return true if the slot requests are possible to be fulfilled, 
otherwise false
+        */
+       @VisibleForTesting
+       static boolean isSlotRequestBulkFulfillable(
+                       final PhysicalSlotRequestBulk slotRequestBulk,
+                       final Supplier<Set<SlotInfo>> slotsRetriever) {
+
+               final Set<AllocationID> assignedSlots = 
slotRequestBulk.getAllocationIdsOfFulfilledRequests();
+               final Set<SlotInfo> reusableSlots = 
getReusableSlots(slotsRetriever, assignedSlots);
+               return 
areRequestsFulfillableWithSlots(slotRequestBulk.getPendingRequests(), 
reusableSlots);
+       }
+
+       private static Set<SlotInfo> getReusableSlots(
+                       final Supplier<Set<SlotInfo>> slotsRetriever,
+                       final Set<AllocationID> slotsToExclude) {
+
+               return slotsRetriever.get().stream()
+                       .filter(slotInfo -> 
!slotInfo.willBeOccupiedIndefinitely())
+                       .filter(slotInfo -> 
!slotsToExclude.contains(slotInfo.getAllocationId()))
+                       .collect(Collectors.toSet());
+       }
+
+       private static boolean areRequestsFulfillableWithSlots(
+                       final Collection<ResourceProfile> 
requestResourceProfiles,
+                       final Set<SlotInfo> slots) {
+
+               final Set<SlotInfo> remainingSlots = new HashSet<>(slots);
+               for (ResourceProfile requestResourceProfile : 
requestResourceProfiles) {
+                       final Optional<SlotInfo> matchedSlot = 
findMatchingSlotForRequest(requestResourceProfile, remainingSlots);
+                       if (matchedSlot.isPresent()) {
+                               remainingSlots.remove(matchedSlot.get());
+                       } else {
+                               return false;
+                       }
+               }

Review comment:
       We don't have to address it right away but we should keep in mind that 
this might have a quadratic complexity.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -98,14 +101,15 @@ public SchedulerImpl(
                        "Scheduler is not initialized with proper main thread 
executor. " +
                                "Call to Scheduler.start(...) required.");
 
-               this.bulkSlotProvider = new 
BulkSlotProviderImpl(slotSelectionStrategy, slotPool);
+               this.slotRequestBulkChecker = 
PhysicalSlotRequestBulkCheckerImpl.fromSlotPool(slotPool, 
SystemClock.getInstance());
+               this.bulkSlotProvider = new 
BulkSlotProviderImpl(slotSelectionStrategy, slotPool, slotRequestBulkChecker);

Review comment:
       Looking at the code, wouldn't the change be quite small? If I am not 
mistaken, then it would be enough to change the `schedulerFactory` argument of 
the `JobMaster` into a `SlotProviderFactory`. That way we should be able to 
provide a different `SlotProvider` implementation than `SchedulerImpl` to the 
`JobMaster` and we would not have to make sure that the `SchedulerImpl` 
implements all methods of the `SlotProvider` interface.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImplTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.createPhysicalSlot;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.occupyPhysicalSlot;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link PhysicalSlotRequestBulkCheckerImpl}.
+ */
+public class PhysicalSlotRequestBulkCheckerImplTest extends TestLogger {
+
+       private static final Time TIMEOUT = Time.milliseconds(100L);

Review comment:
       Could this also be 50ms?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkWithTimestamp.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import java.util.Collection;
+import java.util.Set;
+
+class PhysicalSlotRequestBulkWithTimestamp implements PhysicalSlotRequestBulk {

Review comment:
       I think this class is being used in previous commits. Hence, the order 
in which the commits are split up does not work.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharingPhysicalSlotRequestBulk.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SharingPhysicalSlotRequestBulk implements PhysicalSlotRequestBulk {
+       private final Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> 
executions;
+
+       private final Map<ExecutionSlotSharingGroup, ResourceProfile> 
pendingRequests;
+
+       private final Map<ExecutionSlotSharingGroup, AllocationID> 
fulfilledRequests;
+
+       private final BiConsumer<ExecutionVertexID, Throwable> 
logicalSlotRequestCanceller;
+
+       SharingPhysicalSlotRequestBulk(
+                       Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> 
executions,
+                       Map<ExecutionSlotSharingGroup, ResourceProfile> 
pendingRequests,
+                       Map<ExecutionSlotSharingGroup, AllocationID> 
fulfilledRequests,
+                       BiConsumer<ExecutionVertexID, Throwable> 
logicalSlotRequestCanceller) {
+               this.executions = checkNotNull(executions);
+               this.pendingRequests = checkNotNull(pendingRequests);
+               this.fulfilledRequests = checkNotNull(fulfilledRequests);
+               this.logicalSlotRequestCanceller = 
checkNotNull(logicalSlotRequestCanceller);
+       }
+
+       @Override
+       public Collection<ResourceProfile> getPendingRequests() {
+               return pendingRequests.values();
+       }
+
+       @Override
+       public Set<AllocationID> getAllocationIdsOfFulfilledRequests() {
+               return new HashSet<>(fulfilledRequests.values());

Review comment:
       `Collections.unmodifiableSet` might be a bit cheaper here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.util.DualKeyLinkedMap;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+class SharedSlot implements SlotOwner, PhysicalSlot.Payload {

Review comment:
       `JavaDocs` would be good here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImplTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.createPhysicalSlot;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.occupyPhysicalSlot;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link PhysicalSlotRequestBulkCheckerImpl}.
+ */
+public class PhysicalSlotRequestBulkCheckerImplTest extends TestLogger {
+
+       private static final Time TIMEOUT = Time.milliseconds(100L);
+
+       private static ScheduledExecutorService 
singleThreadScheduledExecutorService;
+
+       private static ComponentMainThreadExecutor mainThreadExecutor;
+
+       private final ManualClock clock = new ManualClock();
+
+       private PhysicalSlotRequestBulkCheckerImpl bulkChecker;
+
+       private Set<PhysicalSlot> slots;
+
+       private Supplier<Set<SlotInfo>> slotsRetriever;
+
+       @BeforeClass
+       public static void setupClass() {
+               singleThreadScheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+               mainThreadExecutor = 
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
+       }
+
+       @AfterClass
+       public static void teardownClass() {
+               if (singleThreadScheduledExecutorService != null) {
+                       singleThreadScheduledExecutorService.shutdownNow();
+               }
+       }
+
+       @Before
+       public void setup() throws Exception {
+               slots = new HashSet<>();
+               slotsRetriever = () -> new HashSet<>(slots);
+               bulkChecker = new 
PhysicalSlotRequestBulkCheckerImpl(slotsRetriever, clock);
+               bulkChecker.start(mainThreadExecutor);
+       }
+
+       @Test
+       public void testPendingBulkIsNotCancelled() throws 
InterruptedException, ExecutionException {
+               final CompletableFuture<SlotRequestId> cancellationFuture = new 
CompletableFuture<>();
+               final PhysicalSlotRequestBulk bulk = 
createPhysicalSlotRequestBulkWithCancellationFuture(cancellationFuture, new 
SlotRequestId());
+               bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, 
TIMEOUT);
+               checkNotCancelledAfter(cancellationFuture, 2 * 
TIMEOUT.toMilliseconds());
+       }
+
+       @Test
+       public void testFulfilledBulkIsNotCancelled() throws 
InterruptedException, ExecutionException {
+               final CompletableFuture<SlotRequestId> cancellationFuture = new 
CompletableFuture<>();
+               final PhysicalSlotRequestBulk bulk = 
createPhysicalSlotRequestBulkWithCancellationFuture(cancellationFuture, new 
SlotRequestId());
+               bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, 
TIMEOUT);
+               checkNotCancelledAfter(cancellationFuture, 2  * 
TIMEOUT.toMilliseconds());
+       }
+
+       private static void checkNotCancelledAfter(
+                       CompletableFuture<?> cancellationFuture, long milli) 
throws ExecutionException, InterruptedException {
+               mainThreadExecutor.schedule(() -> {}, milli, 
TimeUnit.MILLISECONDS).get();
+               try {
+                       assertThat(cancellationFuture.isDone(), is(false));
+                       cancellationFuture.get(milli, TimeUnit.MILLISECONDS);
+                       fail("The future must not have been cancelled");
+               } catch (TimeoutException e) {
+                       assertThat(cancellationFuture.isDone(), is(false));
+               }
+       }
+
+       @Test
+       public void testUnfulfillableBulkIsCancelled() {
+               final CompletableFuture<SlotRequestId> cancellationFuture = new 
CompletableFuture<>();
+               final SlotRequestId slotRequestId = new SlotRequestId();
+               final PhysicalSlotRequestBulk bulk = 
createPhysicalSlotRequestBulkWithCancellationFuture(cancellationFuture, 
slotRequestId);
+               bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, 
TIMEOUT);
+               clock.advanceTime(TIMEOUT.toMilliseconds() + 1L, 
TimeUnit.MILLISECONDS);
+               assertThat(cancellationFuture.join(), is(slotRequestId));
+       }
+
+       @Test
+       public void testBulkFulfilledOnCheck() {
+               final SlotRequestId slotRequestId = new SlotRequestId();
+               final PhysicalSlotRequestBulkImpl bulk = 
createPhysicalSlotRequestBulk(slotRequestId);
+
+               bulk.markRequestFulfilled(slotRequestId, new AllocationID());
+
+               final PhysicalSlotRequestBulkWithTimestamp bulkWithTimestamp = 
new PhysicalSlotRequestBulkWithTimestamp(bulk);
+               assertThat(checkBulkTimeout(bulkWithTimestamp), 
is(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.FULFILLED));
+       }
+
+       @Test
+       public void testBulkTimeoutOnCheck() {
+               final PhysicalSlotRequestBulkWithTimestamp bulk = 
createPhysicalSlotRequestBulkWithTimestamp(new SlotRequestId());
+
+               clock.advanceTime(TIMEOUT.toMilliseconds() + 1L, 
TimeUnit.MILLISECONDS);
+               assertThat(checkBulkTimeout(bulk), 
is(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.TIMEOUT));
+       }
+
+       @Test
+       public void testBulkPendingOnCheckIfFulfillable() {
+               final PhysicalSlotRequestBulkWithTimestamp bulk = 
createPhysicalSlotRequestBulkWithTimestamp(new SlotRequestId());
+
+               final PhysicalSlot slot = addOneSlot();
+               occupyPhysicalSlot(slot, false);
+
+               assertThat(checkBulkTimeout(bulk), 
is(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.PENDING));
+       }
+
+       @Test
+       public void testBulkPendingOnCheckIfUnfulfillableButNotTimedOut() {
+               final PhysicalSlotRequestBulkWithTimestamp bulk = 
createPhysicalSlotRequestBulkWithTimestamp(new SlotRequestId());
+
+               assertThat(checkBulkTimeout(bulk), 
is(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.PENDING));
+       }
+
+       @Test
+       public void testBulkFulfillable() {
+               final PhysicalSlotRequestBulk bulk = 
createPhysicalSlotRequestBulk(new SlotRequestId());
+
+               addOneSlot();
+
+               assertThat(isFulfillable(bulk), is(true));
+       }
+
+       @Test
+       public void testBulkUnfulfillableWithInsufficientSlots() {
+               final PhysicalSlotRequestBulk bulk = 
createPhysicalSlotRequestBulk(new SlotRequestId(), new SlotRequestId());
+
+               addOneSlot();
+
+               assertThat(isFulfillable(bulk), is(false));
+       }
+
+       @Test
+       public void testBulkUnfulfillableWithSlotAlreadyAssignedToBulk() {
+               final SlotRequestId slotRequestId = new SlotRequestId();
+               final PhysicalSlotRequestBulkImpl bulk = 
createPhysicalSlotRequestBulk(slotRequestId, new SlotRequestId());
+
+               final PhysicalSlot slot = addOneSlot();
+
+               bulk.markRequestFulfilled(slotRequestId, 
slot.getAllocationId());
+
+               assertThat(isFulfillable(bulk), is(false));
+       }
+
+       @Test
+       public void testBulkUnfulfillableWithSlotOccupiedIndefinitely() {
+               final PhysicalSlotRequestBulk bulk = 
createPhysicalSlotRequestBulk(new SlotRequestId(), new SlotRequestId());
+
+               final PhysicalSlot slot1 = addOneSlot();
+               addOneSlot();
+
+               occupyPhysicalSlot(slot1, true);
+
+               assertThat(isFulfillable(bulk), is(false));
+       }
+
+       @Test
+       public void testBulkFulfillableWithSlotOccupiedTemporarily() {
+               final PhysicalSlotRequestBulk bulk = 
createPhysicalSlotRequestBulk(new SlotRequestId(), new SlotRequestId());
+
+               final PhysicalSlot slot1 = addOneSlot();
+               addOneSlot();
+
+               occupyPhysicalSlot(slot1, false);
+
+               assertThat(isFulfillable(bulk), is(true));
+       }
+
+       private PhysicalSlotRequestBulkWithTimestamp 
createPhysicalSlotRequestBulkWithTimestamp(SlotRequestId... slotRequestIds) {
+               final PhysicalSlotRequestBulkWithTimestamp bulk = new 
PhysicalSlotRequestBulkWithTimestamp(createPhysicalSlotRequestBulk(slotRequestIds));
+               bulk.markUnfulfillable(clock.relativeTimeMillis());
+               return bulk;
+       }
+
+       private static PhysicalSlotRequestBulkImpl 
createPhysicalSlotRequestBulk(SlotRequestId... slotRequestIds) {
+               final TestingPhysicalSlotRequestBulkBuilder builder = 
TestingPhysicalSlotRequestBulkBuilder.newBuilder();
+               for (SlotRequestId slotRequestId : slotRequestIds) {
+                       builder.addPendingRequest(slotRequestId, 
ResourceProfile.UNKNOWN);
+               }
+               return builder.buildPhysicalSlotRequestBulkImpl();
+       }
+
+       private PhysicalSlotRequestBulk 
createPhysicalSlotRequestBulkWithCancellationFuture(
+               CompletableFuture<SlotRequestId> cancellationFuture,
+               SlotRequestId slotRequestId) {
+               return TestingPhysicalSlotRequestBulkBuilder
+                       .newBuilder()
+                       .addPendingRequest(slotRequestId, 
ResourceProfile.UNKNOWN)
+                       .setCanceller((id, t) -> 
cancellationFuture.complete(id))
+                       .buildPhysicalSlotRequestBulkImpl();
+       }
+
+       private PhysicalSlot addOneSlot() {
+               final PhysicalSlot slot = createPhysicalSlot();
+               slots.add(slot);

Review comment:
       I think it is not guaranteed that changes to `slot` are visible within 
the `bulkChecker` which uses the `mainThreadExecutor` to run since the used 
`Set` implementation is not thread safe.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.util.DualKeyLinkedMap;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
+       private static final Logger LOG = 
LoggerFactory.getLogger(SharedSlot.class);
+
+       private final SlotRequestId physicalSlotRequestId;
+
+       private final ResourceProfile physicalSlotResourceProfile;
+
+       private final ExecutionSlotSharingGroup executionSlotSharingGroup;
+
+       private final CompletableFuture<PhysicalSlot> slotContextFuture;
+
+       private final DualKeyLinkedMap<ExecutionVertexID, SlotRequestId, 
CompletableFuture<SingleLogicalSlot>> requestedLogicalSlots;
+
+       private final boolean slotWillBeOccupiedIndefinitely;
+
+       private final Consumer<ExecutionSlotSharingGroup> releaseCallback;
+
+       SharedSlot(
+                       SlotRequestId physicalSlotRequestId,
+                       ResourceProfile physicalSlotResourceProfile,
+                       ExecutionSlotSharingGroup executionSlotSharingGroup,
+                       CompletableFuture<PhysicalSlot> slotContextFuture,
+                       boolean slotWillBeOccupiedIndefinitely,
+                       Consumer<ExecutionSlotSharingGroup> releaseCallback) {
+               this.physicalSlotRequestId = physicalSlotRequestId;
+               this.physicalSlotResourceProfile = physicalSlotResourceProfile;
+               this.executionSlotSharingGroup = executionSlotSharingGroup;
+               this.slotContextFuture = 
slotContextFuture.thenApply(physicalSlot -> {
+                       Preconditions.checkState(
+                               physicalSlot.tryAssignPayload(this),
+                               "Unexpected physical slot payload assignment 
failure!");
+                       return physicalSlot;
+               });
+               this.requestedLogicalSlots = new 
DualKeyLinkedMap<>(executionSlotSharingGroup.getExecutionVertexIds().size());
+               this.slotWillBeOccupiedIndefinitely = 
slotWillBeOccupiedIndefinitely;
+               this.releaseCallback = releaseCallback;
+       }
+
+       SlotRequestId getPhysicalSlotRequestId() {
+               return physicalSlotRequestId;
+       }
+
+       ResourceProfile getPhysicalSlotResourceProfile() {
+               return physicalSlotResourceProfile;
+       }
+
+       CompletableFuture<PhysicalSlot> getSlotContextFuture() {
+               return slotContextFuture;
+       }
+
+       CompletableFuture<LogicalSlot> allocateLogicalSlot(ExecutionVertexID 
executionVertexId) {
+               
Preconditions.checkArgument(executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexId));
+               CompletableFuture<SingleLogicalSlot> logicalSlotFuture = 
requestedLogicalSlots.getValueByKeyA(executionVertexId);
+               if (logicalSlotFuture != null) {
+                       LOG.debug("Request for {} already exists", 
getLogicalSlotString(executionVertexId));
+               } else {
+                       logicalSlotFuture = 
allocateNonExistentLogicalSlot(executionVertexId);
+               }
+               return logicalSlotFuture.thenApply(Function.identity());
+       }
+
+       private CompletableFuture<SingleLogicalSlot> 
allocateNonExistentLogicalSlot(ExecutionVertexID executionVertexId) {
+               CompletableFuture<SingleLogicalSlot> logicalSlotFuture;
+               SlotRequestId logicalSlotRequestId = new SlotRequestId();
+               String logMessageBase = 
getLogicalSlotString(logicalSlotRequestId, executionVertexId);
+               LOG.debug("Request a {}", logMessageBase);
+
+               logicalSlotFuture = slotContextFuture
+                       .thenApply(physicalSlot -> {
+                               LOG.debug("Allocated {}", logMessageBase);
+                               return createLogicalSlot(physicalSlot, 
logicalSlotRequestId);
+                       });
+               requestedLogicalSlots.put(executionVertexId, 
logicalSlotRequestId, logicalSlotFuture);
+
+               // If the physical slot request fails (slotContextFuture), it 
will also fail the logicalSlotFuture.
+               // Therefore, the next `exceptionally` callback will 
cancelLogicalSlotRequest and do the cleanup

Review comment:
       `exceptionally` calls `removeLogicalSlotRequest` and not 
`cancelLogicalSlotRequest`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharingPhysicalSlotRequestBulk.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SharingPhysicalSlotRequestBulk implements PhysicalSlotRequestBulk {
+       private final Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> 
executions;
+
+       private final Map<ExecutionSlotSharingGroup, ResourceProfile> 
pendingRequests;
+
+       private final Map<ExecutionSlotSharingGroup, AllocationID> 
fulfilledRequests;
+
+       private final BiConsumer<ExecutionVertexID, Throwable> 
logicalSlotRequestCanceller;
+
+       SharingPhysicalSlotRequestBulk(
+                       Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> 
executions,
+                       Map<ExecutionSlotSharingGroup, ResourceProfile> 
pendingRequests,
+                       Map<ExecutionSlotSharingGroup, AllocationID> 
fulfilledRequests,
+                       BiConsumer<ExecutionVertexID, Throwable> 
logicalSlotRequestCanceller) {
+               this.executions = checkNotNull(executions);
+               this.pendingRequests = checkNotNull(pendingRequests);
+               this.fulfilledRequests = checkNotNull(fulfilledRequests);
+               this.logicalSlotRequestCanceller = 
checkNotNull(logicalSlotRequestCanceller);
+       }
+
+       @Override
+       public Collection<ResourceProfile> getPendingRequests() {
+               return pendingRequests.values();
+       }
+
+       @Override
+       public Set<AllocationID> getAllocationIdsOfFulfilledRequests() {
+               return new HashSet<>(fulfilledRequests.values());
+       }
+
+       @Override
+       public void cancel(Throwable cause) {
+               // pending requests must be canceled first otherwise they might 
be fulfilled by
+               // allocated slots released from this bulk
+               Stream
+                       .concat(
+                               pendingRequests.keySet().stream(),
+                               fulfilledRequests.keySet().stream())
+                       .flatMap(group -> executions.get(group).stream())
+                       .forEach(id -> logicalSlotRequestCanceller.accept(id, 
cause));
+       }
+
+       void markFulfilled(ExecutionSlotSharingGroup group, AllocationID 
allocationID) {
+               pendingRequests.remove(group);
+               fulfilledRequests.put(group, allocationID);
+       }
+
+       void clear() {
+               pendingRequests.clear();

Review comment:
       Why do we only clear the `pendingRequests` and not also the fulfilled 
slots? Maybe state the contract of this method in the `JavaDocs`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -161,158 +173,66 @@ private SharedSlot getOrAllocateSharedSlot(
                                        .thenCompose(slotProfile -> 
slotProvider.allocatePhysicalSlot(
                                                new 
PhysicalSlotRequest(physicalSlotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely)))
                                        
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
-                               return new SharedSlot(physicalSlotRequestId, 
physicalSlotResourceProfile, group, physicalSlotFuture);
+                               return new SharedSlot(
+                                       physicalSlotRequestId,
+                                       physicalSlotResourceProfile,
+                                       group,
+                                       physicalSlotFuture,
+                                       slotWillBeOccupiedIndefinitely,
+                                       this::releaseSharedSlot);
                        });
        }
 
+       private void releaseSharedSlot(ExecutionSlotSharingGroup 
executionSlotSharingGroup) {
+               SharedSlot slot = sharedSlots.remove(executionSlotSharingGroup);
+               if (slot != null) {
+                       slotProvider.cancelSlotRequest(
+                               slot.getPhysicalSlotRequestId(),
+                               new FlinkException("Slot is being returned from 
SlotSharingExecutionSlotAllocator."));
+               } else {
+                       LOG.debug("There is no slot for 
ExecutionSlotSharingGroup {} to release", executionSlotSharingGroup);
+               }
+       }
+
        private ResourceProfile 
getPhysicalSlotResourceProfile(ExecutionSlotSharingGroup 
executionSlotSharingGroup) {
                return executionSlotSharingGroup
                        .getExecutionVertexIds()
                        .stream()
                        .reduce(ResourceProfile.ZERO, (r, e) -> 
r.merge(resourceProfileRetriever.apply(e)), ResourceProfile::merge);
        }
 
-       private class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
-               private final SlotRequestId physicalSlotRequestId;
-
-               private final ResourceProfile physicalSlotResourceProfile;
-
-               private final ExecutionSlotSharingGroup 
executionSlotSharingGroup;
-
-               private final CompletableFuture<PhysicalSlot> slotContextFuture;
-
-               private final DualKeyLinkedMap<ExecutionVertexID, 
SlotRequestId, CompletableFuture<SingleLogicalSlot>> requestedLogicalSlots;
-
-               private SharedSlot(
-                               SlotRequestId physicalSlotRequestId,
-                               ResourceProfile physicalSlotResourceProfile,
-                               ExecutionSlotSharingGroup 
executionSlotSharingGroup,
-                               CompletableFuture<PhysicalSlot> 
slotContextFuture) {
-                       this.physicalSlotRequestId = physicalSlotRequestId;
-                       this.physicalSlotResourceProfile = 
physicalSlotResourceProfile;
-                       this.executionSlotSharingGroup = 
executionSlotSharingGroup;
-                       this.slotContextFuture = 
slotContextFuture.thenApply(physicalSlot -> {
-                               Preconditions.checkState(
-                                       physicalSlot.tryAssignPayload(this),
-                                       "Unexpected physical slot payload 
assignment failure!");
-                               return physicalSlot;
+       private SharingPhysicalSlotRequestBulk 
createBulk(Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) {
+               Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests 
= executions
+                       .keySet()
+                       .stream()
+                       .collect(Collectors.toMap(
+                               group -> group,
+                               group -> 
sharedSlots.get(group).getPhysicalSlotResourceProfile()
+                       ));
+               Map<ExecutionSlotSharingGroup, AllocationID> fulfilledRequests 
= new HashMap<>();
+               SharingPhysicalSlotRequestBulk bulk = new 
SharingPhysicalSlotRequestBulk(
+                       executions,
+                       pendingRequests,
+                       fulfilledRequests,

Review comment:
       Why are we passing in an empty map here?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
##########
@@ -223,34 +232,52 @@ public void 
testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() {
                        });
        }
 
-       private static void testLogicalSlotRequestCancellation(
+       @Test
+       public void 
testCompletedLogicalSlotCancelationDoesNotCancelPhysicalSlotRequestAndDoesNotRemoveSharedSlot()
 {
+               // physical slot request is completed and completes logical 
requests
+               testLogicalSlotRequestCancellationOrRelease(
+                       false,
+                       false,
+                       (context, assignment) -> {
+                               
context.getAllocator().cancel(assignment.getExecutionVertexId());
+                               try {
+                                       assignment.getLogicalSlotFuture().get();
+                               } catch (InterruptedException | 
ExecutionException e) {
+                                       throw new 
FlinkRuntimeException("Unexpected", e);
+                               }
+                       });
+       }
+
+       private static void testLogicalSlotRequestCancellationOrRelease(
                        boolean completePhysicalSlotFutureManually,
-                       BiConsumer<AllocationContext, 
SlotExecutionVertexAssignment> cancelAction) {
+                       boolean cancelsPhysicalSlotRequestAndRemovesSharedSlot,
+                       BiConsumer<AllocationContext, 
SlotExecutionVertexAssignment> cancelOrReleaseAction) {

Review comment:
       Maybe make this a `BiConsumerWithException`, then we don't need the 
try-catch blocks.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
##########
@@ -87,9 +86,12 @@
                        SlotRequestId slotRequestId,
                        ResourceProfile resourceProfile,
                        boolean willSlotBeOccupiedIndefinitely) {
-               return willSlotBeOccupiedIndefinitely ?
-                       slotPool.requestNewAllocatedSlot(slotRequestId, 
resourceProfile, null) :
-                       slotPool.requestNewAllocatedBatchSlot(slotRequestId, 
resourceProfile);
+               if (willSlotBeOccupiedIndefinitely) {
+                       return slotPool.requestNewAllocatedSlot(slotRequestId, 
resourceProfile, null);
+               } else {
+                       slotPool.disableBatchSlotRequestTimeoutCheck();

Review comment:
       Hmm, one way to solve this problem is to not use this class in the 
`SchedulerImpl`, right?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharingPhysicalSlotRequestBulk.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SharingPhysicalSlotRequestBulk implements PhysicalSlotRequestBulk {

Review comment:
       This class is lacking tests.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -161,158 +173,66 @@ private SharedSlot getOrAllocateSharedSlot(
                                        .thenCompose(slotProfile -> 
slotProvider.allocatePhysicalSlot(
                                                new 
PhysicalSlotRequest(physicalSlotRequestId, slotProfile, 
slotWillBeOccupiedIndefinitely)))
                                        
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
-                               return new SharedSlot(physicalSlotRequestId, 
physicalSlotResourceProfile, group, physicalSlotFuture);
+                               return new SharedSlot(
+                                       physicalSlotRequestId,
+                                       physicalSlotResourceProfile,
+                                       group,
+                                       physicalSlotFuture,
+                                       slotWillBeOccupiedIndefinitely,
+                                       this::releaseSharedSlot);
                        });
        }
 
+       private void releaseSharedSlot(ExecutionSlotSharingGroup 
executionSlotSharingGroup) {
+               SharedSlot slot = sharedSlots.remove(executionSlotSharingGroup);
+               if (slot != null) {
+                       slotProvider.cancelSlotRequest(
+                               slot.getPhysicalSlotRequestId(),
+                               new FlinkException("Slot is being returned from 
SlotSharingExecutionSlotAllocator."));
+               } else {
+                       LOG.debug("There is no slot for 
ExecutionSlotSharingGroup {} to release", executionSlotSharingGroup);

Review comment:
       What are the conditions under which this branch can happen? Can't we say 
that there must be a `SharedSlot` when this method is called? Making this 
stricter could allow us to catch other programming errors faster.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.util.DualKeyLinkedMap;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+class SharedSlot implements SlotOwner, PhysicalSlot.Payload {

Review comment:
       This class is lacking tests.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.util.DualKeyLinkedMap;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
+       private static final Logger LOG = 
LoggerFactory.getLogger(SharedSlot.class);
+
+       private final SlotRequestId physicalSlotRequestId;
+
+       private final ResourceProfile physicalSlotResourceProfile;
+
+       private final ExecutionSlotSharingGroup executionSlotSharingGroup;
+
+       private final CompletableFuture<PhysicalSlot> slotContextFuture;
+
+       private final DualKeyLinkedMap<ExecutionVertexID, SlotRequestId, 
CompletableFuture<SingleLogicalSlot>> requestedLogicalSlots;
+
+       private final boolean slotWillBeOccupiedIndefinitely;
+
+       private final Consumer<ExecutionSlotSharingGroup> releaseCallback;
+
+       SharedSlot(
+                       SlotRequestId physicalSlotRequestId,
+                       ResourceProfile physicalSlotResourceProfile,
+                       ExecutionSlotSharingGroup executionSlotSharingGroup,
+                       CompletableFuture<PhysicalSlot> slotContextFuture,
+                       boolean slotWillBeOccupiedIndefinitely,
+                       Consumer<ExecutionSlotSharingGroup> releaseCallback) {
+               this.physicalSlotRequestId = physicalSlotRequestId;
+               this.physicalSlotResourceProfile = physicalSlotResourceProfile;
+               this.executionSlotSharingGroup = executionSlotSharingGroup;
+               this.slotContextFuture = 
slotContextFuture.thenApply(physicalSlot -> {
+                       Preconditions.checkState(
+                               physicalSlot.tryAssignPayload(this),
+                               "Unexpected physical slot payload assignment 
failure!");
+                       return physicalSlot;
+               });
+               this.requestedLogicalSlots = new 
DualKeyLinkedMap<>(executionSlotSharingGroup.getExecutionVertexIds().size());
+               this.slotWillBeOccupiedIndefinitely = 
slotWillBeOccupiedIndefinitely;
+               this.releaseCallback = releaseCallback;
+       }
+
+       SlotRequestId getPhysicalSlotRequestId() {
+               return physicalSlotRequestId;
+       }
+
+       ResourceProfile getPhysicalSlotResourceProfile() {
+               return physicalSlotResourceProfile;
+       }
+
+       CompletableFuture<PhysicalSlot> getSlotContextFuture() {
+               return slotContextFuture;
+       }
+
+       CompletableFuture<LogicalSlot> allocateLogicalSlot(ExecutionVertexID 
executionVertexId) {
+               
Preconditions.checkArgument(executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexId));

Review comment:
       An error message could be helpful.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharingPhysicalSlotRequestBulk.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SharingPhysicalSlotRequestBulk implements PhysicalSlotRequestBulk {

Review comment:
       A bit of `JavaDoc` explaining the purpose of this class could be helpful.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharingPhysicalSlotRequestBulk.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SharingPhysicalSlotRequestBulk implements PhysicalSlotRequestBulk {
+       private final Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> 
executions;
+
+       private final Map<ExecutionSlotSharingGroup, ResourceProfile> 
pendingRequests;
+
+       private final Map<ExecutionSlotSharingGroup, AllocationID> 
fulfilledRequests;
+
+       private final BiConsumer<ExecutionVertexID, Throwable> 
logicalSlotRequestCanceller;
+
+       SharingPhysicalSlotRequestBulk(
+                       Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> 
executions,
+                       Map<ExecutionSlotSharingGroup, ResourceProfile> 
pendingRequests,
+                       Map<ExecutionSlotSharingGroup, AllocationID> 
fulfilledRequests,
+                       BiConsumer<ExecutionVertexID, Throwable> 
logicalSlotRequestCanceller) {
+               this.executions = checkNotNull(executions);
+               this.pendingRequests = checkNotNull(pendingRequests);
+               this.fulfilledRequests = checkNotNull(fulfilledRequests);
+               this.logicalSlotRequestCanceller = 
checkNotNull(logicalSlotRequestCanceller);
+       }
+
+       @Override
+       public Collection<ResourceProfile> getPendingRequests() {
+               return pendingRequests.values();
+       }
+
+       @Override
+       public Set<AllocationID> getAllocationIdsOfFulfilledRequests() {
+               return new HashSet<>(fulfilledRequests.values());
+       }
+
+       @Override
+       public void cancel(Throwable cause) {
+               // pending requests must be canceled first otherwise they might 
be fulfilled by
+               // allocated slots released from this bulk
+               Stream
+                       .concat(
+                               pendingRequests.keySet().stream(),
+                               fulfilledRequests.keySet().stream())
+                       .flatMap(group -> executions.get(group).stream())
+                       .forEach(id -> logicalSlotRequestCanceller.accept(id, 
cause));

Review comment:
       Moreover, that way it would also be more explicit to enforce the 
contract you have stated in the comment above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to