zentol commented on a change in pull request #13722:
URL: https://github.com/apache/flink/pull/13722#discussion_r516853155



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
##########
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link DeclarativeSlotPool} implementation.
+ *
+ * <p>The implementation collects the current resource requirements and 
declares them
+ * at the ResourceManager. Whenever new slots are offered, the slot pool 
compares the
+ * offered slots to the set of available and required resources and only 
accepts those
+ * slots which are required.
+ *
+ * <p>Slots which are released won't be returned directly to their owners. 
Instead,
+ * the slot pool implementation will only return them after the 
idleSlotTimeout has
+ * been exceeded by a free slot.
+ *
+ * <p>The slot pool will call {@link #notifyNewSlots} whenever newly offered 
slots are
+ * accepted or if an allocated slot should become free after it is being
+ * {@link #freeReservedSlot released}.
+ */
+public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
+
+       private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
+
+       private final Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots;
+
+       private final Time idleSlotTimeout;
+       private final Time rpcTimeout;
+
+       private final AllocatedSlotPool slotPool;
+
+       private final Map<AllocationID, ResourceProfile> 
slotToRequirementProfileMappings;
+
+       private ResourceCounter totalResourceRequirements;
+
+       private ResourceCounter fulfilledResourceRequirements;
+
+       public DefaultDeclarativeSlotPool(
+               AllocatedSlotPool slotPool,
+               Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
+               Consumer<? super Collection<? extends PhysicalSlot>> 
notifyNewSlots,
+               Time idleSlotTimeout,
+               Time rpcTimeout) {
+
+               this.slotPool = slotPool;
+               this.notifyNewResourceRequirements = 
notifyNewResourceRequirements;
+               this.notifyNewSlots = notifyNewSlots;
+               this.idleSlotTimeout = idleSlotTimeout;
+               this.rpcTimeout = rpcTimeout;
+               this.totalResourceRequirements = ResourceCounter.empty();
+               this.fulfilledResourceRequirements = ResourceCounter.empty();
+               this.slotToRequirementProfileMappings = new HashMap<>();
+       }
+
+       @Override
+       public void increaseResourceRequirementsBy(ResourceCounter increment) {
+               totalResourceRequirements = 
totalResourceRequirements.add(increment);
+
+               declareResourceRequirements();
+       }
+
+       @Override
+       public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+               totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
+
+               declareResourceRequirements();
+       }
+
+       private void declareResourceRequirements() {
+               notifyNewResourceRequirements.accept(getResourceRequirements());
+       }
+
+       @Override
+       public Collection<ResourceRequirement> getResourceRequirements() {
+               final Collection<ResourceRequirement> 
currentResourceRequirements = new ArrayList<>();
+
+               for (Map.Entry<ResourceProfile, Integer> resourceRequirement : 
totalResourceRequirements.getResourcesWithCount()) {
+                       
currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(),
 resourceRequirement.getValue()));
+               }
+
+               return currentResourceRequirements;
+       }
+
+       @Override
+       public Collection<SlotOffer> offerSlots(
+               Collection<? extends SlotOffer> offers,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway,
+               long currentTime) {
+
+               LOG.debug("Received {} slot offers from TaskExecutor {}.", 
offers.size(), taskManagerLocation);
+               final Collection<SlotOffer> acceptedSlotOffers = new 
ArrayList<>();
+               final Collection<SlotOffer> candidates = new ArrayList<>();
+
+               // filter out already accepted offers
+               for (SlotOffer offer : offers) {
+                       final AllocationID allocationId = 
offer.getAllocationId();
+                       if (slotPool.containsSlot(allocationId)) {
+                               acceptedSlotOffers.add(offer);
+                       } else {
+                               candidates.add(offer);
+                       }
+               }
+
+               final Collection<SlotOfferMatching> matchings = 
matchOffersWithOutstandingRequirements(candidates);
+
+               final Collection<AllocatedSlot> acceptedSlots = new 
ArrayList<>();
+               ResourceCounter acceptedResources = ResourceCounter.empty();
+
+               for (SlotOfferMatching matching : matchings) {
+                       if (matching.getMatching().isPresent()) {
+                               final ResourceProfile matchedResourceProfile = 
matching.getMatching().get();
+
+                               final AllocatedSlot allocatedSlot = 
createAllocatedSlot(
+                                       matching.getSlotOffer(),
+                                       taskManagerLocation,
+                                       taskManagerGateway);
+
+                               acceptedSlots.add(allocatedSlot);
+                               acceptedSlotOffers.add(matching.getSlotOffer());
+
+                               acceptedResources = 
acceptedResources.add(matchedResourceProfile, 1);
+
+                               // store the ResourceProfile against which the 
given slot has matched for future book-keeping
+                               
slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), 
matchedResourceProfile);
+                       }
+               }
+
+               slotPool.addSlots(acceptedSlots, currentTime);
+               increaseAvailableResources(acceptedResources);
+
+               if (!acceptedSlots.isEmpty()) {
+                       notifyNewSlots.accept(acceptedSlots);
+               }
+
+               return acceptedSlotOffers;
+       }
+
+       private Collection<SlotOfferMatching> 
matchOffersWithOutstandingRequirements(Collection<SlotOffer> slotOffers) {
+               ResourceCounter unfulfilledResources = 
calculateUnfulfilledResources();
+
+               final Collection<SlotOfferMatching> matching = new 
ArrayList<>();
+
+               for (SlotOffer slotOffer : slotOffers) {
+                       ResourceProfile matchingResourceProfile = null;
+
+                       if 
(unfulfilledResources.containsResource(slotOffer.getResourceProfile())) {
+                               unfulfilledResources = 
unfulfilledResources.subtract(slotOffer.getResourceProfile(), 1);
+
+                               // use the profile of the slot offer since it 
is a direct match against the requirement
+                               matchingResourceProfile = 
slotOffer.getResourceProfile();
+                       } else {
+                               for (ResourceProfile unfulfilledResource : 
unfulfilledResources.getResources()) {
+                                       if 
(slotOffer.getResourceProfile().isMatching(unfulfilledResource)) {
+                                               matchingResourceProfile = 
unfulfilledResource;
+                                               break;
+                                       }
+                               }
+                       }
+
+                       final SlotOfferMatching slotOfferMatching;
+                       if (matchingResourceProfile != null) {
+                               slotOfferMatching = 
SlotOfferMatching.createMatching(slotOffer, matchingResourceProfile);
+                       } else {
+                               slotOfferMatching = 
SlotOfferMatching.createMismatch(slotOffer);
+                       }
+
+                       matching.add(slotOfferMatching);
+               }
+
+               return matching;
+       }
+
+       @VisibleForTesting
+       ResourceCounter calculateUnfulfilledResources() {
+               return 
totalResourceRequirements.subtract(fulfilledResourceRequirements);
+       }
+
+       private AllocatedSlot createAllocatedSlot(
+               SlotOffer slotOffer,
+               TaskManagerLocation taskManagerLocation,
+               TaskManagerGateway taskManagerGateway) {
+               return new AllocatedSlot(
+                       slotOffer.getAllocationId(),
+                       taskManagerLocation,
+                       slotOffer.getSlotIndex(),
+                       slotOffer.getResourceProfile(),
+                       taskManagerGateway);
+       }
+
+       private void increaseAvailableResources(ResourceCounter 
acceptedResources) {
+               fulfilledResourceRequirements = 
fulfilledResourceRequirements.add(acceptedResources);
+       }
+
+       @Nonnull
+       private ResourceProfile getMatchingResourceProfile(AllocationID 
allocationId) {
+               return 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId), 
"No matching resource profile found for %s", allocationId);
+       }
+
+       @Override
+       public PhysicalSlot reserveFreeSlot(AllocationID allocationId, 
ResourceProfile requiredSlotProfile) {
+               final AllocatedSlot allocatedSlot = 
slotPool.reserveFreeSlot(allocationId);
+
+               
Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile),
 "");
+
+               ResourceProfile previouslyMatchedResourceProfile = 
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId));
+
+               updateSlotToRequirementProfileMapping(allocationId, 
requiredSlotProfile);

Review comment:
       I'm not sure if we can get _rid_ of it.
   
   The information that _this_ slot was used to fulfill _that_ requirement is 
used for so many things; be it matching slot offers against outstanding 
requirements, figuring out whether a slot is eligible for an idleness timeout, 
or to update the fulfilled requirements if a slot is released.
   It seems impossible to remove this mapping without removing everything that 
makes this pool more than a wrapper around the `AllocatedSlotPool`.
   
   At the end of the day we need this information _somewhere_ anyway; be it in 
the slot pool, or the new scheduler so it could make sure to update the 
requirements.
   Given that the declarative slot pool is all about managing slots in respects 
to requirements, it seems fitting that it contains such a mapping.
   
   Maybe this is just a naming/structural issue? What if the 
DeclarativeSlotPool were not actually a pool, but a requirement manager?
   Slot offers and releases would be issued directly against he 
AllocatedSlotPool, which informs the declarative slot pool, which then takes 
the appropriate steps of adjusting requirements etc. .
   Basically, instead of removing the mapping portion out of the slot pool, 
remove the slot pool portion from it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to